NIFI-12831: Add PutOpenSearchVector and QueryOpenSearchVector processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8441.
This commit is contained in:
Mark Bathori 2024-02-21 15:13:47 +01:00 committed by Pierre Villard
parent f87a0f47ef
commit b608e5a2f0
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 609 additions and 0 deletions

View File

@ -0,0 +1,142 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope, PropertyDependency
from EmbeddingUtils import OPENAI, HUGGING_FACE, EMBEDDING_MODEL
# Space types
L2 = ("L2 (Euclidean distance)", "l2")
L1 = ("L1 (Manhattan distance)", "l1")
LINF = ("L-infinity (chessboard) distance", "linf")
COSINESIMIL = ("Cosine similarity", "cosinesimil")
HUGGING_FACE_API_KEY = PropertyDescriptor(
name="HuggingFace API Key",
description="The API Key for interacting with HuggingFace",
required=True,
sensitive=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
)
HUGGING_FACE_MODEL = PropertyDescriptor(
name="HuggingFace Model",
description="The name of the HuggingFace model to use",
default_value="sentence-transformers/all-MiniLM-L6-v2",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
)
OPENAI_API_KEY = PropertyDescriptor(
name="OpenAI API Key",
description="The API Key for OpenAI in order to create embeddings",
required=True,
sensitive=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
)
OPENAI_API_MODEL = PropertyDescriptor(
name="OpenAI Model",
description="The API Key for OpenAI in order to create embeddings",
default_value="text-embedding-ada-002",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
)
HTTP_HOST = PropertyDescriptor(
name="HTTP Host",
description="URL where OpenSearch is hosted.",
default_value="http://localhost:9200",
required=True,
validators=[StandardValidators.URL_VALIDATOR]
)
USERNAME = PropertyDescriptor(
name="Username",
description="The username to use for authenticating to OpenSearch server",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR]
)
PASSWORD = PropertyDescriptor(
name="Password",
description="The password to use for authenticating to OpenSearch server",
required=False,
sensitive=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR]
)
INDEX_NAME = PropertyDescriptor(
name="Index Name",
description="The name of the OpenSearch index.",
sensitive=False,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
VECTOR_FIELD = PropertyDescriptor(
name="Vector Field Name",
description="The name of field in the document where the embeddings are stored. This field need to be a 'knn_vector' typed field.",
default_value="vector_field",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
TEXT_FIELD = PropertyDescriptor(
name="Text Field Name",
description="The name of field in the document where the text is stored.",
default_value="text",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
def create_authentication_params(context):
username = context.getProperty(USERNAME).getValue()
password = context.getProperty(PASSWORD).getValue()
params = {"verify_certs": "true"}
if username is not None and password is not None:
params["http_auth"] = (username, password)
return params
def parse_documents(json_lines, id_field_name, file_name):
import json
texts = []
metadatas = []
ids = []
for i, line in enumerate(json_lines.split("\n"), start=1):
try:
doc = json.loads(line)
except Exception as e:
raise ValueError(f"Could not parse line {i} as JSON") from e
text = doc.get('text')
metadata = doc.get('metadata')
texts.append(text)
# Remove any null values, or it will cause the embedding to fail
filtered_metadata = {key: value for key, value in metadata.items() if value is not None}
metadatas.append(filtered_metadata)
doc_id = None
if id_field_name is not None:
doc_id = metadata.get(id_field_name)
if doc_id is None:
doc_id = file_name + "-" + str(i)
ids.append(doc_id)
return {"texts": texts, "metadatas": metadatas, "ids": ids}

View File

@ -0,0 +1,245 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from langchain.vectorstores import OpenSearchVectorSearch
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope, PropertyDependency
from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY, OPENAI_API_MODEL, HUGGING_FACE_API_KEY,
HUGGING_FACE_MODEL, HTTP_HOST, USERNAME, PASSWORD, INDEX_NAME, VECTOR_FIELD,
TEXT_FIELD, create_authentication_params, parse_documents)
from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
from nifiapi.documentation import use_case, ProcessorConfiguration
@use_case(description="Create vectors/embeddings that represent text content and send the vectors to OpenSearch",
notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in OpenSearch provided in the 'text' field.",
keywords=["opensearch", "embedding", "vector", "text", "vectorstore", "insert"],
configuration="""
Configure the 'HTTP Host' to an appropriate URL where OpenSearch is accessible.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model.
Set 'Index Name' to the name of your OpenSearch Index.
Set 'Vector Field Name' to the name of the field in the document which will store the vector data.
Set 'Text Field Name' to the name of the field in the document which will store the text data.
If the documents to send to OpenSearch contain a unique identifier, set the 'Document ID Field Name' property to the name of the field that contains the document ID.
This property can be left blank, in which case a unique ID will be generated based on the FlowFile's filename.
If the provided index does not exists in OpenSearch then the processor is capable to create it. The 'New Index Strategy' property defines
that the index needs to be created from the default template or it should be configured with custom values.
""")
@use_case(description="Update vectors/embeddings in OpenSearch",
notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in OpenSearch provided in the 'text' field.",
keywords=["opensearch", "embedding", "vector", "text", "vectorstore", "update", "upsert"],
configuration="""
Configure the 'HTTP Host' to an appropriate URL where OpenSearch is accessible.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model.
Set 'Index Name' to the name of your OpenSearch Index.
Set 'Vector Field Name' to the name of the field in the document which will store the vector data.
Set 'Text Field Name' to the name of the field in the document which will store the text data.
Set the 'Document ID Field Name' property to the name of the field that contains the identifier of the document in OpenSearch to update.
""")
class PutOpenSearchVector(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '@project.version@'
description = """Publishes JSON data to OpenSearch. The Incoming data must be in single JSON per Line format, each with two keys: 'text' and 'metadata'.
The text must be a string, while metadata must be a map with strings for values. Any additional fields will be ignored."""
tags = ["opensearch", "vector", "vectordb", "vectorstore", "embeddings", "ai", "artificial intelligence", "ml",
"machine learning", "text", "LLM"]
# Engine types
NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib")
FAISS = ("faiss (Facebook AI Similarity Search)", "faiss")
LUCENE = ("lucene", "lucene")
ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE])
# Space types
INNERPRODUCT = ("Inner product", "innerproduct")
NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT])
FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT])
LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL])
# New Index Mapping Strategy
DEFAULT_INDEX_MAPPING = "Default index mapping"
CUSTOM_INDEX_MAPPING = "Custom index mapping"
DOC_ID_FIELD_NAME = PropertyDescriptor(
name="Document ID Field Name",
description="""Specifies the name of the field in the 'metadata' element of each document where the document's ID can be found.
If not specified, an ID will be generated based on the FlowFile's filename and a one-up number.""",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
NEW_INDEX_STRATEGY = PropertyDescriptor(
name="New Index Strategy",
description="Specifies the Mapping strategy to use for new index creation. The default template values are the following: "
"{engine: nmslib, space_type: l2, ef_search: 512, ef_construction: 512, m: 16}",
allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING],
default_value=DEFAULT_INDEX_MAPPING,
required=False,
)
ENGINE = PropertyDescriptor(
name="Engine",
description="The approximate k-NN library to use for indexing and search.",
allowable_values=ENGINE_VALUES.keys(),
default_value=NMSLIB[0],
required=False,
dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)]
)
NMSLIB_SPACE_TYPE = PropertyDescriptor(
name="NMSLIB Space Type",
description="The vector space used to calculate the distance between vectors.",
allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(),
default_value=L2[0],
required=False,
dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING),
PropertyDependency(ENGINE, NMSLIB[0])]
)
FAISS_SPACE_TYPE = PropertyDescriptor(
name="FAISS Space Type",
description="The vector space used to calculate the distance between vectors.",
allowable_values=FAISS_SPACE_TYPE_VALUES.keys(),
default_value=L2[0],
required=False,
dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING),
PropertyDependency(ENGINE, FAISS[0])]
)
LUCENE_SPACE_TYPE = PropertyDescriptor(
name="Lucene Space Type",
description="The vector space used to calculate the distance between vectors.",
allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(),
default_value=L2[0],
required=False,
dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING),
PropertyDependency(ENGINE, LUCENE[0])]
)
EF_SEARCH = PropertyDescriptor(
name="EF Search",
description="The size of the dynamic list used during k-NN searches. Higher values lead to more accurate but slower searches.",
default_value="512",
required=False,
validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)]
)
EF_CONSTRUCTION = PropertyDescriptor(
name="EF Construction",
description="The size of the dynamic list used during k-NN graph creation. Higher values lead to a more accurate graph but slower indexing speed.",
default_value="512",
required=False,
validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)]
)
M = PropertyDescriptor(
name="M",
description="The number of bidirectional links that the plugin creates for each new element. Increasing and "
"decreasing this value can have a large impact on memory consumption. Keep this value between 2 and 100.",
default_value="16",
required=False,
validators=[StandardValidators._standard_validators.createLongValidator(2, 100, True)],
dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)]
)
properties = [EMBEDDING_MODEL,
OPENAI_API_KEY,
OPENAI_API_MODEL,
HUGGING_FACE_API_KEY,
HUGGING_FACE_MODEL,
HTTP_HOST,
USERNAME,
PASSWORD,
INDEX_NAME,
DOC_ID_FIELD_NAME,
VECTOR_FIELD,
TEXT_FIELD,
NEW_INDEX_STRATEGY,
ENGINE,
NMSLIB_SPACE_TYPE,
FAISS_SPACE_TYPE,
LUCENE_SPACE_TYPE,
EF_SEARCH,
EF_CONSTRUCTION,
M]
embeddings = None
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.properties
def onScheduled(self, context):
self.embeddings = create_embedding_service(context)
def transform(self, context, flowfile):
file_name = flowfile.getAttribute("filename")
http_host = context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
index_name = context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
id_field_name = context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
vector_field = context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
text_field = context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
new_index_strategy = context.getProperty(self.NEW_INDEX_STRATEGY).evaluateAttributeExpressions().getValue()
params = {"vector_field": vector_field, "text_field": text_field}
params.update(create_authentication_params(context))
if new_index_strategy == self.CUSTOM_INDEX_MAPPING:
engine = context.getProperty(self.ENGINE).evaluateAttributeExpressions().getValue()
params["engine"] = self.ENGINE_VALUES.get(engine)
if engine == self.NMSLIB[0]:
space_type = context.getProperty(self.NMSLIB_SPACE_TYPE).evaluateAttributeExpressions().getValue()
params["space_type"] = self.NMSLIB_SPACE_TYPE_VALUES.get(space_type)
if engine == self.FAISS[0]:
space_type = context.getProperty(self.FAISS_SPACE_TYPE).evaluateAttributeExpressions().getValue()
params["space_type"] = self.FAISS_SPACE_TYPE_VALUES.get(space_type)
if engine == self.LUCENE[0]:
space_type = context.getProperty(self.LUCENE_SPACE_TYPE).evaluateAttributeExpressions().getValue()
params["space_type"] = self.LUCENE_SPACE_TYPE_VALUES.get(space_type)
ef_search = context.getProperty(self.EF_SEARCH).evaluateAttributeExpressions().asInteger()
params["ef_search"] = ef_search
ef_construction = context.getProperty(self.EF_CONSTRUCTION).evaluateAttributeExpressions().asInteger()
params["ef_construction"] = ef_construction
m = context.getProperty(self.M).evaluateAttributeExpressions().asInteger()
params["m"] = m
# Read the FlowFile content as "json-lines".
json_lines = flowfile.getContentsAsBytes().decode()
parsed_documents = parse_documents(json_lines, id_field_name, file_name)
vectorstore = OpenSearchVectorSearch(
opensearch_url=http_host,
index_name=index_name,
embedding_function=self.embeddings,
**params
)
vectorstore.add_texts(texts=parsed_documents["texts"],
metadatas=parsed_documents["metadatas"],
ids=parsed_documents["ids"],
**params
)
return FlowFileTransformResult(relationship="success")

View File

@ -0,0 +1,219 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from langchain.vectorstores import OpenSearchVectorSearch
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope, PropertyDependency
from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY, OPENAI_API_MODEL, HUGGING_FACE_API_KEY, HUGGING_FACE_MODEL, HTTP_HOST,
USERNAME, PASSWORD, INDEX_NAME, VECTOR_FIELD, TEXT_FIELD, create_authentication_params)
from QueryUtils import OUTPUT_STRATEGY, RESULTS_FIELD, INCLUDE_METADATAS, INCLUDE_DISTANCES, QueryUtils
import json
from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
class QueryOpenSearchVector(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '@project.version@'
description = "Queries OpenSearch in order to gather a specified number of documents that are most closely related to the given query."
tags = ["opensearch", "vector", "vectordb", "vectorstore", "embeddings", "ai", "artificial intelligence", "ml",
"machine learning", "text", "LLM"]
# Search types
APPROXIMATE_SEARCH = ("Approximate Search", "approximate_search")
SCRIPT_SCORING_SEARCH = ("Script Scoring Search", "script_scoring")
PAINLESS_SCRIPTING_SEARCH = ("Painless Scripting Search", "painless_scripting")
SEARCH_TYPE_VALUES = dict([APPROXIMATE_SEARCH, SCRIPT_SCORING_SEARCH, PAINLESS_SCRIPTING_SEARCH])
# Script Scoring Search space types
HAMMINGBIT = ("Hamming distance", "hammingbit")
SCRIPT_SCORING_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, HAMMINGBIT])
# Painless Scripting Search space types
L2_SQUARED = ("L2 (Euclidean distance)", "l2Squared")
L1_NORM = ("L1 (Manhattan distance)", "l1Norm")
COSINE_SIMILARITY = ("Cosine similarity", "cosineSimilarity")
PAINLESS_SCRIPTING_SPACE_TYPE_VALUES = dict([L2_SQUARED, L1_NORM, COSINE_SIMILARITY])
QUERY = PropertyDescriptor(
name="Query",
description="The text of the query to send to OpenSearch.",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
NUMBER_OF_RESULTS = PropertyDescriptor(
name="Number of Results",
description="The number of results to return from OpenSearch",
default_value="10",
required=True,
validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
SEARCH_TYPE = PropertyDescriptor(
name="Search Type",
description="Specifies the type of the search to be performed.",
allowable_values=SEARCH_TYPE_VALUES.keys(),
default_value=APPROXIMATE_SEARCH[0],
required=True
)
SCRIPT_SCORING_SPACE_TYPE = PropertyDescriptor(
name="Script Scoring Space Type",
description="Used to measure the distance between two points in order to determine the k-nearest neighbors.",
allowable_values=SCRIPT_SCORING_SPACE_TYPE_VALUES.keys(),
default_value=L2[0],
required=False,
dependencies=[PropertyDependency(SEARCH_TYPE, SCRIPT_SCORING_SEARCH[0])]
)
PAINLESS_SCRIPTING_SPACE_TYPE = PropertyDescriptor(
name="Painless Scripting Space Type",
description="Used to measure the distance between two points in order to determine the k-nearest neighbors.",
allowable_values=PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.keys(),
default_value=L2_SQUARED[0],
required=False,
dependencies=[PropertyDependency(SEARCH_TYPE, PAINLESS_SCRIPTING_SEARCH[0])]
)
BOOLEAN_FILTER = PropertyDescriptor(
name="Boolean Filter",
description="A Boolean filter is a post filter consists of a Boolean query that contains a k-NN query and a filter. "
"The value of the field must be a JSON representation of the filter.",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])]
)
EFFICIENT_FILTER = PropertyDescriptor(
name="Efficient Filter",
description="The Lucene Engine or Faiss Engine decides whether to perform an exact k-NN search with "
"pre-filtering or an approximate search with modified post-filtering. The value of the field must "
"be a JSON representation of the filter.",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])]
)
PRE_FILTER = PropertyDescriptor(
name="Pre Filter",
description="Script Score query to pre-filter documents before identifying nearest neighbors. The value of "
"the field must be a JSON representation of the filter.",
default_value="{\"match_all\": {}}",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(SEARCH_TYPE, SCRIPT_SCORING_SEARCH[0], PAINLESS_SCRIPTING_SEARCH[0])]
)
properties = [EMBEDDING_MODEL,
OPENAI_API_KEY,
OPENAI_API_MODEL,
HUGGING_FACE_API_KEY,
HUGGING_FACE_MODEL,
HTTP_HOST,
USERNAME,
PASSWORD,
INDEX_NAME,
QUERY,
VECTOR_FIELD,
TEXT_FIELD,
NUMBER_OF_RESULTS,
SEARCH_TYPE,
SCRIPT_SCORING_SPACE_TYPE,
PAINLESS_SCRIPTING_SPACE_TYPE,
BOOLEAN_FILTER,
EFFICIENT_FILTER,
PRE_FILTER,
OUTPUT_STRATEGY,
RESULTS_FIELD,
INCLUDE_METADATAS,
INCLUDE_DISTANCES]
embeddings = None
query_utils = None
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.properties
def onScheduled(self, context):
# initialize embedding service
self.embeddings = create_embedding_service(context)
self.query_utils = QueryUtils(context)
def transform(self, context, flowfile):
http_host = context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
index_name = context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
query = context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
num_results = context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger()
vector_field = context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
text_field = context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
search_type = context.getProperty(self.SEARCH_TYPE).evaluateAttributeExpressions().getValue()
params = {"vector_field": vector_field,
"text_field": text_field,
"search_type": self.SEARCH_TYPE_VALUES.get(search_type)}
params.update(create_authentication_params(context))
if search_type == self.APPROXIMATE_SEARCH[0]:
boolean_filter = context.getProperty(self.BOOLEAN_FILTER).evaluateAttributeExpressions().getValue()
if boolean_filter is not None:
params["boolean_filter"] = json.loads(boolean_filter)
efficient_filter = context.getProperty(self.EFFICIENT_FILTER).evaluateAttributeExpressions().getValue()
if efficient_filter is not None:
params["efficient_filter"] = json.loads(efficient_filter)
else:
pre_filter = context.getProperty(self.PRE_FILTER).evaluateAttributeExpressions().getValue()
if pre_filter is not None:
params["pre_filter"] = json.loads(pre_filter)
if search_type == self.SCRIPT_SCORING_SEARCH[0]:
space_type = context.getProperty(self.SCRIPT_SCORING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
params["space_type"] = self.SCRIPT_SCORING_SPACE_TYPE_VALUES.get(space_type)
elif search_type == self.PAINLESS_SCRIPTING_SEARCH[0]:
space_type = context.getProperty(self.PAINLESS_SCRIPTING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
params["space_type"] = self.PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.get(space_type)
vectorstore = OpenSearchVectorSearch(index_name=index_name,
embedding_function=self.embeddings,
opensearch_url=http_host,
**params
)
results = vectorstore.similarity_search_with_score(query=query, k=num_results, **params)
documents = []
for result in results:
documents.append(result[0].page_content)
if context.getProperty(INCLUDE_METADATAS):
metadatas = []
for result in results:
metadatas.append(result[0].metadata)
else:
metadatas = None
if context.getProperty(INCLUDE_DISTANCES):
distances = []
for result in results:
distances.append(result[1])
else:
distances = None
(output_contents, mime_type) = self.query_utils.create_json(flowfile, documents, metadatas, None, distances, None)
attributes = {"mime.type": mime_type}
return FlowFileTransformResult(relationship="success", contents=output_contents, attributes=attributes)

View File

@ -27,3 +27,6 @@ requests
pinecone-client==3.0.1 pinecone-client==3.0.1
tiktoken tiktoken
langchain==0.1.11 langchain==0.1.11
# OpenSearch requirements
opensearch-py==2.5.0