diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py new file mode 100644 index 0000000000..f7e2de1ea5 --- /dev/null +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py @@ -0,0 +1,174 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from langchain.vectorstores.qdrant import Qdrant +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.properties import ( + PropertyDescriptor, + StandardValidators, + ExpressionLanguageScope, +) +import json +from EmbeddingUtils import ( + create_embedding_service, +) +from nifiapi.documentation import use_case + +from qdrant_client.models import Distance + +import QdrantUtils + + +@use_case( + description="Create embeddings that semantically represent text content and upload to Qdrant - https://qdrant.tech/", + notes="This processor assumes that the data has already been formatted in JSONL format with the text to store in Qdrant provided in the 'text' field.", + keywords=["qdrant", "embedding", "vector", "text", "vectorstore", "insert"], + configuration=""" + Configure 'Collection Name' to the name of the Qdrant collection to use. + Configure 'Qdrant URL' to the fully qualified URL of the Qdrant instance. + Configure 'Qdrant API Key' to the API Key to use in order to authenticate with Qdrant. + Configure 'Prefer gRPC' to True if you want to use gRPC for interfacing with Qdrant. + Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) while interfacing with Qdrant. + Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model' + Configure 'HuggingFace API Key' or 'OpenAI API Key', depending on the chosen Embedding Model. + Configure 'HuggingFace Model' or 'OpenAI Model' to the name of the model to use. + Configure 'Force Recreate Collection' to True if you want to recreate the collection if it already exists. + Configure 'Similarity Metric' to the similarity metric to use when querying Qdrant. + + If the documents to send to Qdrant contain a unique identifier(UUID), set the 'Document ID Field Name' property to the name of the field that contains the document ID. + This property can be left blank, in which case a UUID will be generated based on the FlowFile's filename. + """, +) +class PutQdrant(FlowFileTransform): + class Java: + implements = ["org.apache.nifi.python.processor.FlowFileTransform"] + + class ProcessorDetails: + version = "@project.version@" + description = """Publishes JSON data to Qdrant. The Incoming data must be in single JSON per Line format, each with two keys: 'text' and 'metadata'. + The text must be a string, while metadata must be a map with strings for values. Any additional fields will be ignored.""" + tags = [ + "qdrant", + "vector", + "vectordb", + "vectorstore", + "embeddings", + "ai", + "artificial intelligence", + "ml", + "machine learning", + "text", + "LLM", + ] + + DOC_ID_FIELD_NAME = PropertyDescriptor( + name="Document ID Field Name", + description="""Specifies the name of the field in the 'metadata' element of each document where the document's ID can be found. + If not specified, a UUID will be generated based on the FlowFile's filename and an incremental number.""", + required=False, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + ) + FORCE_RECREATE_COLLECTION = PropertyDescriptor( + name="Force Recreate Collection", + description="Specifies whether to recreate the collection if it already exists. Essentially clearing the existing data.", + required=True, + default_value=False, + allowable_values=["True", "False"], + validators=[StandardValidators.BOOLEAN_VALIDATOR], + ) + SIMILARITY_METRIC = PropertyDescriptor( + name="Similarity Metric", + description="Specifies the similarity metric when creating the collection.", + required=True, + default_value=Distance.COSINE, + allowable_values=[ + Distance.COSINE, + Distance.EUCLID, + Distance.DOT, + Distance.MANHATTAN, + ], + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + ) + + properties = ( + QdrantUtils.QDRANT_PROPERTIES + + QdrantUtils.EMBEDDING_MODEL_PROPERTIES + + [ + FORCE_RECREATE_COLLECTION, + SIMILARITY_METRIC, + DOC_ID_FIELD_NAME, + ] + ) + + def __init__(self, **kwargs): + pass + + def getPropertyDescriptors(self): + return self.properties + + def onScheduled(self, context): + # The Qdrant#construct_instance() internally checks if the collection exists + # and creates it if it doesn't with the appropriate dimesions and configurations. + self.vector_store = Qdrant.construct_instance( + texts=[ + "Some text to obtain the embeddings dimension when creating the collection" + ], + embedding=create_embedding_service(context), + collection_name=context.getProperty(QdrantUtils.COLLECTION_NAME).getValue(), + url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(), + api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(), + prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(), + https=context.getProperty(QdrantUtils.HTTPS).asBoolean(), + force_recreate=context.getProperty( + self.FORCE_RECREATE_COLLECTION + ).asBoolean(), + distance_func=context.getProperty(self.SIMILARITY_METRIC).getValue(), + ) + + def transform(self, context, flowfile): + id_field_name = ( + context.getProperty(self.DOC_ID_FIELD_NAME) + .evaluateAttributeExpressions(flowfile) + .getValue() + ) + + # Read the FlowFile content as "json-lines". + json_lines = flowfile.getContentsAsBytes().decode() + i = 1 + texts, metadatas, ids = [], [], [] + for line in json_lines.split("\n"): + try: + doc = json.loads(line) + except Exception as e: + raise ValueError(f"Could not parse line {i} as JSON") from e + + metadata = doc.get("metadata") + texts.append(doc.get("text")) + metadatas.append(metadata) + + doc_id = None + if id_field_name is not None: + doc_id = metadata.get(id_field_name) + if doc_id is None: + doc_id = QdrantUtils.convert_id( + flowfile.getAttribute("filename") + "-" + str(i) + ) + ids.append(doc_id) + + i += 1 + + self.vector_store.add_texts(texts=texts, metadatas=metadatas, ids=ids) + return FlowFileTransformResult(relationship="success") diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py new file mode 100644 index 0000000000..9b4214b32c --- /dev/null +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.properties import ( + PropertyDescriptor, + StandardValidators, + ExpressionLanguageScope, + PropertyDependency, +) +from EmbeddingUtils import ( + OPENAI, + HUGGING_FACE, + EMBEDDING_MODEL, +) + +import uuid + +DEFAULT_COLLECTION_NAME = "apache-nifi" + + +COLLECTION_NAME = PropertyDescriptor( + name="Collection Name", + description="The name of the Qdrant collection to use.", + sensitive=False, + required=True, + default_value=DEFAULT_COLLECTION_NAME, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, +) +QDRANT_URL = PropertyDescriptor( + name="Qdrant URL", + description="The fully qualified URL to the Qdrant instance.", + sensitive=False, + required=True, + default_value="http://localhost:6333", + validators=[StandardValidators.URL_VALIDATOR], +) +QDRANT_API_KEY = PropertyDescriptor( + name="Qdrant API Key", + description="The API Key to use in order to authentication with Qdrant. Can be empty.", + sensitive=True, + required=True, +) + +PREFER_GRPC = PropertyDescriptor( + name="Prefer gRPC", + description="Specifies whether to use gRPC for interfacing with Qdrant.", + required=True, + default_value=False, + allowable_values=["True", "False"], + validators=[StandardValidators.BOOLEAN_VALIDATOR], +) +HTTPS = PropertyDescriptor( + name="Use HTTPS", + description="Specifies whether to TLS(HTTPS) while interfacing with Qdrant.", + required=True, + default_value=False, + allowable_values=["True", "False"], + validators=[StandardValidators.BOOLEAN_VALIDATOR], +) + +QDRANT_PROPERTIES = [COLLECTION_NAME, QDRANT_URL, QDRANT_API_KEY, PREFER_GRPC, HTTPS] + +HUGGING_FACE_API_KEY = PropertyDescriptor( + name="HuggingFace API Key", + description="The API Key for interacting with HuggingFace", + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + required=True, + sensitive=True, + dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)], +) +HUGGING_FACE_MODEL = PropertyDescriptor( + name="HuggingFace Model", + description="The name of the HuggingFace model to use.", + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + required=True, + default_value="sentence-transformers/all-MiniLM-L6-v2", + dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)], +) +OPENAI_API_KEY = PropertyDescriptor( + name="OpenAI API Key", + description="The API Key for OpenAI in order to create embeddings.", + sensitive=True, + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)], +) +OPENAI_API_MODEL = PropertyDescriptor( + name="OpenAI Model", + description="The name of the OpenAI model to use.", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + default_value="text-embedding-ada-002", + dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)], +) + +EMBEDDING_MODEL_PROPERTIES = [ + EMBEDDING_MODEL, + HUGGING_FACE_API_KEY, + HUGGING_FACE_MODEL, + OPENAI_API_KEY, + OPENAI_API_MODEL, +] + + +def convert_id(_id: str) -> str: + """ + Converts any string into a UUID string deterministically. + + Qdrant accepts UUID strings and unsigned integers as point ID. + This allows us to overwrite the same point with the original ID. + """ + return str(uuid.uuid5(uuid.NAMESPACE_DNS, _id)) diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py new file mode 100644 index 0000000000..7e1b0fca49 --- /dev/null +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py @@ -0,0 +1,192 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from langchain.vectorstores.qdrant import Qdrant +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.properties import ( + PropertyDescriptor, + StandardValidators, + ExpressionLanguageScope, +) +import QueryUtils +import json +from EmbeddingUtils import ( + create_embedding_service, +) + +from nifiapi.documentation import use_case + +from qdrant_client import QdrantClient + +import QdrantUtils + + +@use_case( + description="Semantically search for documents stored in Qdrant - https://qdrant.tech/", + keywords=["qdrant", "embedding", "vector", "text", "vectorstore", "search"], + configuration=""" + Configure 'Collection Name' to the name of the Qdrant collection to use. + Configure 'Qdrant URL' to the fully qualified URL of the Qdrant instance. + Configure 'Qdrant API Key' to the API Key to use in order to authenticate with Qdrant. + Configure 'Prefer gRPC' to True if you want to use gRPC for interfacing with Qdrant. + Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) while interfacing with Qdrant. + Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model' + Configure 'HuggingFace API Key' or 'OpenAI API Key', depending on the chosen Embedding Model. + Configure 'HuggingFace Model' or 'OpenAI Model' to the name of the model to use. + Configure 'Query' to the text of the query to send to Qdrant. + Configure 'Number of Results' to the number of results to return from Qdrant. + Configure 'Metadata Filter' to apply an optional metadata filter with the query. For example: { "author": "john.doe" } + Configure 'Output Strategy' to indicate how the output should be formatted: 'Row-Oriented', 'Text', or 'Column-Oriented'. + Configure 'Results Field' to the name of the field to insert the results, if the input FlowFile is JSON Formatted,. + Configure 'Include Metadatas' to True if metadata should be included in the output. + Configure 'Include Distances' to True if distances should be included in the output. + """, +) +class QueryQdrant(FlowFileTransform): + class Java: + implements = ["org.apache.nifi.python.processor.FlowFileTransform"] + + class ProcessorDetails: + version = "@project.version@" + description = "Queries Qdrant in order to gather a specified number of documents that are most closely related to the given query." + tags = [ + "qdrant", + "vector", + "vectordb", + "vectorstore", + "embeddings", + "ai", + "artificial intelligence", + "ml", + "machine learning", + "text", + "LLM", + ] + + QUERY = PropertyDescriptor( + name="Query", + description="The text of the query to send to Qdrant.", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + ) + NUMBER_OF_RESULTS = PropertyDescriptor( + name="Number of Results", + description="The number of results to return from Qdrant.", + required=True, + validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR], + default_value="10", + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + ) + FILTER = PropertyDescriptor( + name="Metadata Filter", + description='Optional metadata filter to apply with the query. For example: { "author": "john.doe" }', + required=False, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + ) + + properties = ( + QdrantUtils.QDRANT_PROPERTIES + + QdrantUtils.EMBEDDING_MODEL_PROPERTIES + + [ + QUERY, + FILTER, + NUMBER_OF_RESULTS, + QueryUtils.OUTPUT_STRATEGY, + QueryUtils.RESULTS_FIELD, + QueryUtils.INCLUDE_METADATAS, + QueryUtils.INCLUDE_DISTANCES, + ] + ) + + embeddings = None + query_utils = None + client = None + + def __init__(self, **kwargs): + pass + + def getPropertyDescriptors(self): + return self.properties + + def onScheduled(self, context): + self.client = QdrantClient( + url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(), + api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(), + prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(), + https=context.getProperty(QdrantUtils.HTTPS).asBoolean(), + ) + self.embeddings = create_embedding_service(context) + self.query_utils = QueryUtils.QueryUtils(context) + + def transform(self, context, flowfile): + collection_name = ( + context.getProperty(QdrantUtils.COLLECTION_NAME) + .evaluateAttributeExpressions(flowfile) + .getValue() + ) + query = ( + context.getProperty(self.QUERY) + .evaluateAttributeExpressions(flowfile) + .getValue() + ) + num_results = ( + context.getProperty(self.NUMBER_OF_RESULTS) + .evaluateAttributeExpressions(flowfile) + .asInteger() + ) + filter = ( + context.getProperty(self.FILTER) + .evaluateAttributeExpressions(flowfile) + .getValue() + ) + vector_store = Qdrant( + client=self.client, + collection_name=collection_name, + embeddings=self.embeddings, + ) + results = vector_store.similarity_search_with_score( + query=query, + k=num_results, + filter=None if filter is None else json.loads(filter), + ) + + documents = [] + for result in results: + documents.append(result[0].page_content) + + if context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean(): + metadatas = [] + for result in results: + metadatas.append(result[0].metadata) + else: + metadatas = None + + if context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean(): + distances = [] + for result in results: + distances.append(result[1]) + else: + distances = None + + (output_contents, mime_type) = self.query_utils.create_json( + flowfile, documents, metadatas, None, distances, None + ) + attributes = {"mime.type": mime_type} + + return FlowFileTransformResult( + relationship="success", contents=output_contents, attributes=attributes + ) diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt index fbefc24508..fcaf25b839 100644 --- a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt @@ -15,6 +15,8 @@ # Shared requirements openai==1.9.0 +tiktoken +langchain==0.1.11 # Chroma requirements chromadb==0.4.22 @@ -30,3 +32,6 @@ langchain==0.1.11 # OpenSearch requirements opensearch-py==2.5.0 + +# Qdrant requirements +qdrant-client==1.9.1