# ****** WARNING: THIS FILE IS AUTOGENERATED ******
#
# This file is autogenerated. Do not edit it manually.
# To regenerate this file, use
#
# ```
# bash -x tools/generate_async_client.sh
# ```
#
# ****** WARNING: THIS FILE IS AUTOGENERATED ******
import itertools
import json
import logging
import os
import shutil
from io import TextIOWrapper
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
import numpy as np
import portalocker
from qdrant_client._pydantic_compat import to_dict
from qdrant_client.async_client_base import AsyncQdrantBase
from qdrant_client.conversions import common_types as types
from qdrant_client.http import models as rest_models
from qdrant_client.http.models.models import RecommendExample
from qdrant_client.local.local_collection import LocalCollection
META_INFO_FILENAME = "meta.json"
[docs]class AsyncQdrantLocal(AsyncQdrantBase):
"""
Everything Qdrant server can do, but locally.
Use this implementation to run vector search without running a Qdrant server.
Everything that works with local Qdrant will work with server Qdrant as well.
Use for small-scale data, demos, and tests.
If you need more speed or size, use Qdrant server.
"""
def __init__(self, location: str, force_disable_check_same_thread: bool = False) -> None:
"""
Initialize local Qdrant.
Args:
location: Where to store data. Can be a path to a directory or `:memory:` for in-memory storage.
force_disable_check_same_thread: Disable SQLite check_same_thread check. Use only if you know what you are doing.
"""
super().__init__()
self.force_disable_check_same_thread = force_disable_check_same_thread
self.location = location
self.persistent = location != ":memory:"
self.collections: Dict[str, LocalCollection] = {}
self.aliases: Dict[str, str] = {}
self._flock_file: Optional[TextIOWrapper] = None
self._load()
self._closed: bool = False
[docs] def closed(self) -> bool:
return self._closed
[docs] async def close(self, **kwargs: Any) -> None:
self._closed = True
for collection in self.collections.values():
collection.close()
if self._flock_file is not None:
portalocker.unlock(self._flock_file)
def _load(self) -> None:
if not self.persistent:
return
meta_path = os.path.join(self.location, META_INFO_FILENAME)
if not os.path.exists(meta_path):
os.makedirs(self.location, exist_ok=True)
with open(meta_path, "w") as f:
f.write(json.dumps({"collections": {}, "aliases": {}}))
else:
with open(meta_path, "r") as f:
meta = json.load(f)
for collection_name, config_json in meta["collections"].items():
config = rest_models.CreateCollection(**config_json)
collection_path = self._collection_path(collection_name)
self.collections[collection_name] = LocalCollection(
config,
collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.aliases = meta["aliases"]
lock_file_path = os.path.join(self.location, ".lock")
if not os.path.exists(lock_file_path):
os.makedirs(self.location, exist_ok=True)
with open(lock_file_path, "w") as f:
f.write("tmp lock file")
self._flock_file = open(lock_file_path, "r+")
try:
portalocker.lock(
self._flock_file,
portalocker.LockFlags.EXCLUSIVE | portalocker.LockFlags.NON_BLOCKING,
)
except portalocker.exceptions.LockException:
raise RuntimeError(
f"Storage folder {self.location} is already accessed by another instance of Qdrant client. If you require concurrent access, use Qdrant server instead."
)
def _save(self) -> None:
if not self.persistent:
return
meta_path = os.path.join(self.location, META_INFO_FILENAME)
with open(meta_path, "w") as f:
f.write(
json.dumps(
{
"collections": {
collection_name: to_dict(collection.config)
for (collection_name, collection) in self.collections.items()
},
"aliases": self.aliases,
}
)
)
def _get_collection(self, collection_name: str) -> LocalCollection:
if collection_name in self.collections:
return self.collections[collection_name]
if collection_name in self.aliases:
return self.collections[self.aliases[collection_name]]
raise ValueError(f"Collection {collection_name} not found")
[docs] async def search_batch(
self, collection_name: str, requests: Sequence[types.SearchRequest], **kwargs: Any
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.search(
query_vector=request.vector,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
)
for request in requests
]
[docs] async def search(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
Tuple[str, List[float]],
types.NamedVector,
types.NamedSparseVector,
],
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.search(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
)
[docs] async def search_groups(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray, Sequence[float], Tuple[str, List[float]], types.NamedVector
],
group_by: str,
query_filter: Optional[rest_models.Filter] = None,
search_params: Optional[rest_models.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
with_payload: Union[bool, Sequence[str], rest_models.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.search_groups(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
group_by=group_by,
group_size=group_size,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
)
[docs] async def recommend_batch(
self, collection_name: str, requests: Sequence[types.RecommendRequest], **kwargs: Any
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.recommend(
positive=request.positive,
negative=request.negative,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
strategy=request.strategy,
)
for request in requests
]
[docs] async def recommend(
self,
collection_name: str,
positive: Optional[Sequence[RecommendExample]] = None,
negative: Optional[Sequence[RecommendExample]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
score_threshold: Optional[float] = None,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.recommend(
positive=positive,
negative=negative,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
strategy=strategy,
)
[docs] async def recommend_groups(
self,
collection_name: str,
group_by: str,
positive: Optional[Sequence[Union[types.PointId, List[float]]]] = None,
negative: Optional[Sequence[Union[types.PointId, List[float]]]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
score_threshold: Optional[float] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.recommend_groups(
positive=positive,
negative=negative,
group_by=group_by,
group_size=group_size,
query_filter=query_filter,
limit=limit,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
strategy=strategy,
)
[docs] async def discover(
self,
collection_name: str,
target: Optional[types.TargetVector] = None,
context: Optional[Sequence[types.ContextExamplePair]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.discover(
target=target,
context=context,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
)
[docs] async def discover_batch(
self, collection_name: str, requests: Sequence[types.DiscoverRequest], **kwargs: Any
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.discover(
target=request.target,
context=request.context,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
)
for request in requests
]
[docs] async def count(
self,
collection_name: str,
count_filter: Optional[types.Filter] = None,
exact: bool = True,
**kwargs: Any,
) -> types.CountResult:
collection = self._get_collection(collection_name)
return collection.count(count_filter=count_filter)
[docs] async def upsert(
self, collection_name: str, points: types.Points, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.upsert(points)
return self._default_update_result()
[docs] async def update_vectors(
self, collection_name: str, points: Sequence[types.PointVectors], **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.update_vectors(points)
return self._default_update_result()
[docs] async def delete_vectors(
self,
collection_name: str,
vectors: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_vectors(vectors, points)
return self._default_update_result()
[docs] async def retrieve(
self,
collection_name: str,
ids: Sequence[types.PointId],
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
**kwargs: Any,
) -> List[types.Record]:
collection = self._get_collection(collection_name)
return collection.retrieve(ids, with_payload, with_vectors)
@classmethod
def _default_update_result(cls, operation_id: int = 0) -> types.UpdateResult:
return types.UpdateResult(
operation_id=operation_id, status=rest_models.UpdateStatus.COMPLETED
)
[docs] async def delete(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete(points_selector)
return self._default_update_result()
[docs] async def set_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.set_payload(payload=payload, selector=points)
return self._default_update_result()
[docs] async def overwrite_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.overwrite_payload(payload=payload, selector=points)
return self._default_update_result()
[docs] async def delete_payload(
self,
collection_name: str,
keys: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_payload(keys=keys, selector=points)
return self._default_update_result()
[docs] async def clear_payload(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.clear_payload(selector=points_selector)
return self._default_update_result()
[docs] async def batch_update_points(
self,
collection_name: str,
update_operations: Sequence[types.UpdateOperation],
**kwargs: Any,
) -> List[types.UpdateResult]:
collection = self._get_collection(collection_name)
collection.batch_update_points(update_operations)
return [self._default_update_result()] * len(update_operations)
[docs] async def update_collection_aliases(
self, change_aliases_operations: Sequence[types.AliasOperations], **kwargs: Any
) -> bool:
for operation in change_aliases_operations:
if isinstance(operation, rest_models.CreateAliasOperation):
self._get_collection(operation.create_alias.collection_name)
self.aliases[
operation.create_alias.alias_name
] = operation.create_alias.collection_name
elif isinstance(operation, rest_models.DeleteAliasOperation):
self.aliases.pop(operation.delete_alias.alias_name, None)
elif isinstance(operation, rest_models.RenameAliasOperation):
new_name = operation.rename_alias.new_alias_name
old_name = operation.rename_alias.old_alias_name
self.aliases[new_name] = self.aliases.pop(old_name)
else:
raise ValueError(f"Unknown operation: {operation}")
self._save()
return True
[docs] async def get_collection_aliases(
self, collection_name: str, **kwargs: Any
) -> types.CollectionsAliasesResponse:
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(alias_name=alias_name, collection_name=name)
for (alias_name, name) in self.aliases.items()
if name == collection_name
]
)
[docs] async def get_aliases(self, **kwargs: Any) -> types.CollectionsAliasesResponse:
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(alias_name=alias_name, collection_name=name)
for (alias_name, name) in self.aliases.items()
]
)
[docs] async def get_collections(self, **kwargs: Any) -> types.CollectionsResponse:
return types.CollectionsResponse(
collections=[
rest_models.CollectionDescription(name=name)
for (name, _) in self.collections.items()
]
)
[docs] async def get_collection(self, collection_name: str, **kwargs: Any) -> types.CollectionInfo:
collection = self._get_collection(collection_name)
return collection.info()
[docs] async def update_collection(self, collection_name: str, **kwargs: Any) -> bool:
self._get_collection(collection_name)
return False
def _collection_path(self, collection_name: str) -> Optional[str]:
if self.persistent:
return os.path.join(self.location, "collection", collection_name)
else:
return None
[docs] async def delete_collection(self, collection_name: str, **kwargs: Any) -> bool:
_collection = self.collections.pop(collection_name, None)
del _collection
self.aliases = {
alias_name: name
for (alias_name, name) in self.aliases.items()
if name != collection_name
}
collection_path = self._collection_path(collection_name)
if collection_path is not None:
shutil.rmtree(collection_path, ignore_errors=True)
self._save()
return True
[docs] async def create_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
src_collection = None
from_collection_name = None
if init_from is not None:
from_collection_name = (
init_from if isinstance(init_from, str) else init_from.collection
)
src_collection = self._get_collection(from_collection_name)
if collection_name in self.collections:
raise ValueError(f"Collection {collection_name} already exists")
collection_path = self._collection_path(collection_name)
if collection_path is not None:
os.makedirs(collection_path, exist_ok=True)
collection = LocalCollection(
rest_models.CreateCollection(
vectors=vectors_config, sparse_vectors=sparse_vectors_config
),
location=collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.collections[collection_name] = collection
if src_collection and from_collection_name:
batch_size = 100
(records, next_offset) = await self.scroll(
from_collection_name, limit=2, with_vectors=True
)
self.upload_records(collection_name, records)
while next_offset is not None:
(records, next_offset) = await self.scroll(
from_collection_name, offset=next_offset, limit=batch_size, with_vectors=True
)
self.upload_records(collection_name, records)
self._save()
return True
[docs] async def recreate_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
await self.delete_collection(collection_name)
return await self.create_collection(
collection_name, vectors_config, init_from, sparse_vectors_config
)
[docs] def upload_records(
self, collection_name: str, records: Iterable[types.Record], **kwargs: Any
) -> None:
collection = self._get_collection(collection_name)
collection.upsert(
[
rest_models.PointStruct(
id=record.id, vector=record.vector or {}, payload=record.payload or {}
)
for record in records
]
)
[docs] def upload_collection(
self,
collection_name: str,
vectors: Union[
Dict[str, types.NumpyArray], types.NumpyArray, Iterable[types.VectorStruct]
],
payload: Optional[Iterable[Dict[Any, Any]]] = None,
ids: Optional[Iterable[types.PointId]] = None,
**kwargs: Any,
) -> None:
collection = self._get_collection(collection_name)
if isinstance(vectors, dict) and any(
(isinstance(v, np.ndarray) for v in vectors.values())
):
assert (
len(set([arr.shape[0] for arr in vectors.values()])) == 1
), "Each named vector should have the same number of vectors"
num_vectors = next(iter(vectors.values())).shape[0]
vectors = [
{name: vectors[name][i].tolist() for name in vectors.keys()}
for i in range(num_vectors)
]
collection.upsert(
[
rest_models.PointStruct(
id=point_id,
vector=(vector.tolist() if isinstance(vector, np.ndarray) else vector) or {},
payload=payload or {},
)
for (point_id, vector, payload) in zip(
ids or itertools.count(), iter(vectors), payload or itertools.cycle([{}])
)
]
)
[docs] async def create_payload_index(
self,
collection_name: str,
field_name: str,
field_schema: Optional[types.PayloadSchemaType] = None,
field_type: Optional[types.PayloadSchemaType] = None,
**kwargs: Any,
) -> types.UpdateResult:
logging.warning(
"Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes."
)
return self._default_update_result()
[docs] async def delete_payload_index(
self, collection_name: str, field_name: str, **kwargs: Any
) -> types.UpdateResult:
logging.warning(
"Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes."
)
return self._default_update_result()
[docs] async def list_snapshots(
self, collection_name: str, **kwargs: Any
) -> List[types.SnapshotDescription]:
return []
[docs] async def create_snapshot(
self, collection_name: str, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def delete_snapshot(
self, collection_name: str, snapshot_name: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def list_full_snapshots(self, **kwargs: Any) -> List[types.SnapshotDescription]:
return []
[docs] async def create_full_snapshot(self, **kwargs: Any) -> types.SnapshotDescription:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def delete_full_snapshot(self, snapshot_name: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def recover_snapshot(self, collection_name: str, location: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def list_shard_snapshots(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> List[types.SnapshotDescription]:
return []
[docs] async def create_shard_snapshot(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] async def delete_shard_snapshot(
self, collection_name: str, shard_id: int, snapshot_name: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] async def recover_shard_snapshot(
self, collection_name: str, shard_id: int, location: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] async def lock_storage(self, reason: str, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def unlock_storage(self, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def get_locks(self, **kwargs: Any) -> types.LocksOption:
return types.LocksOption(error_message=None, write=False)
[docs] async def create_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
shards_number: Optional[int] = None,
replication_factor: Optional[int] = None,
placement: Optional[List[int]] = None,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)
[docs] async def delete_shard_key(
self, collection_name: str, shard_key: types.ShardKey, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)