NIFI-12983 Qdrant vector store support

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8590.
This commit is contained in:
Anush008 2024-04-01 11:25:13 +05:30 committed by Pierre Villard
parent 406dbced3b
commit a56572d3b2
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 496 additions and 0 deletions

View File

@ -0,0 +1,174 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from langchain.vectorstores.qdrant import Qdrant
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import (
PropertyDescriptor,
StandardValidators,
ExpressionLanguageScope,
)
import json
from EmbeddingUtils import (
create_embedding_service,
)
from nifiapi.documentation import use_case
from qdrant_client.models import Distance
import QdrantUtils
@use_case(
description="Create embeddings that semantically represent text content and upload to Qdrant - https://qdrant.tech/",
notes="This processor assumes that the data has already been formatted in JSONL format with the text to store in Qdrant provided in the 'text' field.",
keywords=["qdrant", "embedding", "vector", "text", "vectorstore", "insert"],
configuration="""
Configure 'Collection Name' to the name of the Qdrant collection to use.
Configure 'Qdrant URL' to the fully qualified URL of the Qdrant instance.
Configure 'Qdrant API Key' to the API Key to use in order to authenticate with Qdrant.
Configure 'Prefer gRPC' to True if you want to use gRPC for interfacing with Qdrant.
Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) while interfacing with Qdrant.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure 'HuggingFace API Key' or 'OpenAI API Key', depending on the chosen Embedding Model.
Configure 'HuggingFace Model' or 'OpenAI Model' to the name of the model to use.
Configure 'Force Recreate Collection' to True if you want to recreate the collection if it already exists.
Configure 'Similarity Metric' to the similarity metric to use when querying Qdrant.
If the documents to send to Qdrant contain a unique identifier(UUID), set the 'Document ID Field Name' property to the name of the field that contains the document ID.
This property can be left blank, in which case a UUID will be generated based on the FlowFile's filename.
""",
)
class PutQdrant(FlowFileTransform):
class Java:
implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
class ProcessorDetails:
version = "@project.version@"
description = """Publishes JSON data to Qdrant. The Incoming data must be in single JSON per Line format, each with two keys: 'text' and 'metadata'.
The text must be a string, while metadata must be a map with strings for values. Any additional fields will be ignored."""
tags = [
"qdrant",
"vector",
"vectordb",
"vectorstore",
"embeddings",
"ai",
"artificial intelligence",
"ml",
"machine learning",
"text",
"LLM",
]
DOC_ID_FIELD_NAME = PropertyDescriptor(
name="Document ID Field Name",
description="""Specifies the name of the field in the 'metadata' element of each document where the document's ID can be found.
If not specified, a UUID will be generated based on the FlowFile's filename and an incremental number.""",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
FORCE_RECREATE_COLLECTION = PropertyDescriptor(
name="Force Recreate Collection",
description="Specifies whether to recreate the collection if it already exists. Essentially clearing the existing data.",
required=True,
default_value=False,
allowable_values=["True", "False"],
validators=[StandardValidators.BOOLEAN_VALIDATOR],
)
SIMILARITY_METRIC = PropertyDescriptor(
name="Similarity Metric",
description="Specifies the similarity metric when creating the collection.",
required=True,
default_value=Distance.COSINE,
allowable_values=[
Distance.COSINE,
Distance.EUCLID,
Distance.DOT,
Distance.MANHATTAN,
],
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
)
properties = (
QdrantUtils.QDRANT_PROPERTIES
+ QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+ [
FORCE_RECREATE_COLLECTION,
SIMILARITY_METRIC,
DOC_ID_FIELD_NAME,
]
)
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.properties
def onScheduled(self, context):
# The Qdrant#construct_instance() internally checks if the collection exists
# and creates it if it doesn't with the appropriate dimesions and configurations.
self.vector_store = Qdrant.construct_instance(
texts=[
"Some text to obtain the embeddings dimension when creating the collection"
],
embedding=create_embedding_service(context),
collection_name=context.getProperty(QdrantUtils.COLLECTION_NAME).getValue(),
url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
force_recreate=context.getProperty(
self.FORCE_RECREATE_COLLECTION
).asBoolean(),
distance_func=context.getProperty(self.SIMILARITY_METRIC).getValue(),
)
def transform(self, context, flowfile):
id_field_name = (
context.getProperty(self.DOC_ID_FIELD_NAME)
.evaluateAttributeExpressions(flowfile)
.getValue()
)
# Read the FlowFile content as "json-lines".
json_lines = flowfile.getContentsAsBytes().decode()
i = 1
texts, metadatas, ids = [], [], []
for line in json_lines.split("\n"):
try:
doc = json.loads(line)
except Exception as e:
raise ValueError(f"Could not parse line {i} as JSON") from e
metadata = doc.get("metadata")
texts.append(doc.get("text"))
metadatas.append(metadata)
doc_id = None
if id_field_name is not None:
doc_id = metadata.get(id_field_name)
if doc_id is None:
doc_id = QdrantUtils.convert_id(
flowfile.getAttribute("filename") + "-" + str(i)
)
ids.append(doc_id)
i += 1
self.vector_store.add_texts(texts=texts, metadatas=metadatas, ids=ids)
return FlowFileTransformResult(relationship="success")

View File

@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.properties import (
PropertyDescriptor,
StandardValidators,
ExpressionLanguageScope,
PropertyDependency,
)
from EmbeddingUtils import (
OPENAI,
HUGGING_FACE,
EMBEDDING_MODEL,
)
import uuid
DEFAULT_COLLECTION_NAME = "apache-nifi"
COLLECTION_NAME = PropertyDescriptor(
name="Collection Name",
description="The name of the Qdrant collection to use.",
sensitive=False,
required=True,
default_value=DEFAULT_COLLECTION_NAME,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
QDRANT_URL = PropertyDescriptor(
name="Qdrant URL",
description="The fully qualified URL to the Qdrant instance.",
sensitive=False,
required=True,
default_value="http://localhost:6333",
validators=[StandardValidators.URL_VALIDATOR],
)
QDRANT_API_KEY = PropertyDescriptor(
name="Qdrant API Key",
description="The API Key to use in order to authentication with Qdrant. Can be empty.",
sensitive=True,
required=True,
)
PREFER_GRPC = PropertyDescriptor(
name="Prefer gRPC",
description="Specifies whether to use gRPC for interfacing with Qdrant.",
required=True,
default_value=False,
allowable_values=["True", "False"],
validators=[StandardValidators.BOOLEAN_VALIDATOR],
)
HTTPS = PropertyDescriptor(
name="Use HTTPS",
description="Specifies whether to TLS(HTTPS) while interfacing with Qdrant.",
required=True,
default_value=False,
allowable_values=["True", "False"],
validators=[StandardValidators.BOOLEAN_VALIDATOR],
)
QDRANT_PROPERTIES = [COLLECTION_NAME, QDRANT_URL, QDRANT_API_KEY, PREFER_GRPC, HTTPS]
HUGGING_FACE_API_KEY = PropertyDescriptor(
name="HuggingFace API Key",
description="The API Key for interacting with HuggingFace",
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
required=True,
sensitive=True,
dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
)
HUGGING_FACE_MODEL = PropertyDescriptor(
name="HuggingFace Model",
description="The name of the HuggingFace model to use.",
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
required=True,
default_value="sentence-transformers/all-MiniLM-L6-v2",
dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
)
OPENAI_API_KEY = PropertyDescriptor(
name="OpenAI API Key",
description="The API Key for OpenAI in order to create embeddings.",
sensitive=True,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
)
OPENAI_API_MODEL = PropertyDescriptor(
name="OpenAI Model",
description="The name of the OpenAI model to use.",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
default_value="text-embedding-ada-002",
dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
)
EMBEDDING_MODEL_PROPERTIES = [
EMBEDDING_MODEL,
HUGGING_FACE_API_KEY,
HUGGING_FACE_MODEL,
OPENAI_API_KEY,
OPENAI_API_MODEL,
]
def convert_id(_id: str) -> str:
"""
Converts any string into a UUID string deterministically.
Qdrant accepts UUID strings and unsigned integers as point ID.
This allows us to overwrite the same point with the original ID.
"""
return str(uuid.uuid5(uuid.NAMESPACE_DNS, _id))

View File

@ -0,0 +1,192 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from langchain.vectorstores.qdrant import Qdrant
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import (
PropertyDescriptor,
StandardValidators,
ExpressionLanguageScope,
)
import QueryUtils
import json
from EmbeddingUtils import (
create_embedding_service,
)
from nifiapi.documentation import use_case
from qdrant_client import QdrantClient
import QdrantUtils
@use_case(
description="Semantically search for documents stored in Qdrant - https://qdrant.tech/",
keywords=["qdrant", "embedding", "vector", "text", "vectorstore", "search"],
configuration="""
Configure 'Collection Name' to the name of the Qdrant collection to use.
Configure 'Qdrant URL' to the fully qualified URL of the Qdrant instance.
Configure 'Qdrant API Key' to the API Key to use in order to authenticate with Qdrant.
Configure 'Prefer gRPC' to True if you want to use gRPC for interfacing with Qdrant.
Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) while interfacing with Qdrant.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure 'HuggingFace API Key' or 'OpenAI API Key', depending on the chosen Embedding Model.
Configure 'HuggingFace Model' or 'OpenAI Model' to the name of the model to use.
Configure 'Query' to the text of the query to send to Qdrant.
Configure 'Number of Results' to the number of results to return from Qdrant.
Configure 'Metadata Filter' to apply an optional metadata filter with the query. For example: { "author": "john.doe" }
Configure 'Output Strategy' to indicate how the output should be formatted: 'Row-Oriented', 'Text', or 'Column-Oriented'.
Configure 'Results Field' to the name of the field to insert the results, if the input FlowFile is JSON Formatted,.
Configure 'Include Metadatas' to True if metadata should be included in the output.
Configure 'Include Distances' to True if distances should be included in the output.
""",
)
class QueryQdrant(FlowFileTransform):
class Java:
implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
class ProcessorDetails:
version = "@project.version@"
description = "Queries Qdrant in order to gather a specified number of documents that are most closely related to the given query."
tags = [
"qdrant",
"vector",
"vectordb",
"vectorstore",
"embeddings",
"ai",
"artificial intelligence",
"ml",
"machine learning",
"text",
"LLM",
]
QUERY = PropertyDescriptor(
name="Query",
description="The text of the query to send to Qdrant.",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
NUMBER_OF_RESULTS = PropertyDescriptor(
name="Number of Results",
description="The number of results to return from Qdrant.",
required=True,
validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
default_value="10",
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
FILTER = PropertyDescriptor(
name="Metadata Filter",
description='Optional metadata filter to apply with the query. For example: { "author": "john.doe" }',
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
properties = (
QdrantUtils.QDRANT_PROPERTIES
+ QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+ [
QUERY,
FILTER,
NUMBER_OF_RESULTS,
QueryUtils.OUTPUT_STRATEGY,
QueryUtils.RESULTS_FIELD,
QueryUtils.INCLUDE_METADATAS,
QueryUtils.INCLUDE_DISTANCES,
]
)
embeddings = None
query_utils = None
client = None
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.properties
def onScheduled(self, context):
self.client = QdrantClient(
url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
)
self.embeddings = create_embedding_service(context)
self.query_utils = QueryUtils.QueryUtils(context)
def transform(self, context, flowfile):
collection_name = (
context.getProperty(QdrantUtils.COLLECTION_NAME)
.evaluateAttributeExpressions(flowfile)
.getValue()
)
query = (
context.getProperty(self.QUERY)
.evaluateAttributeExpressions(flowfile)
.getValue()
)
num_results = (
context.getProperty(self.NUMBER_OF_RESULTS)
.evaluateAttributeExpressions(flowfile)
.asInteger()
)
filter = (
context.getProperty(self.FILTER)
.evaluateAttributeExpressions(flowfile)
.getValue()
)
vector_store = Qdrant(
client=self.client,
collection_name=collection_name,
embeddings=self.embeddings,
)
results = vector_store.similarity_search_with_score(
query=query,
k=num_results,
filter=None if filter is None else json.loads(filter),
)
documents = []
for result in results:
documents.append(result[0].page_content)
if context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean():
metadatas = []
for result in results:
metadatas.append(result[0].metadata)
else:
metadatas = None
if context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean():
distances = []
for result in results:
distances.append(result[1])
else:
distances = None
(output_contents, mime_type) = self.query_utils.create_json(
flowfile, documents, metadatas, None, distances, None
)
attributes = {"mime.type": mime_type}
return FlowFileTransformResult(
relationship="success", contents=output_contents, attributes=attributes
)

View File

@ -15,6 +15,8 @@
# Shared requirements
openai==1.9.0
tiktoken
langchain==0.1.11
# Chroma requirements
chromadb==0.4.22
@ -30,3 +32,6 @@ langchain==0.1.11
# OpenSearch requirements
opensearch-py==2.5.0
# Qdrant requirements
qdrant-client==1.9.1