From 2ee2e2d2fbb0460028fb885017c8ee0d2bc3b857 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Tue, 5 Aug 2025 05:09:56 +0000 Subject: [PATCH] Adding /_api/import --- arangoasync/collection.py | 102 ++++++++++++++++++++++++++++++++++++++ docs/document.rst | 33 ++++++++++++ tests/test_collection.py | 18 +++++++ 3 files changed, 153 insertions(+) diff --git a/arangoasync/collection.py b/arangoasync/collection.py index e3d12ee..52a9d9e 100644 --- a/arangoasync/collection.py +++ b/arangoasync/collection.py @@ -1578,6 +1578,108 @@ def response_handler( return await self._executor.execute(request, response_handler) + async def import_bulk( + self, + documents: bytes | str, + doc_type: Optional[str] = None, + complete: Optional[bool] = True, + details: Optional[bool] = True, + from_prefix: Optional[str] = None, + to_prefix: Optional[str] = None, + overwrite: Optional[bool] = None, + overwrite_collection_prefix: Optional[bool] = None, + on_duplicate: Optional[str] = None, + wait_for_sync: Optional[bool] = None, + ignore_missing: Optional[bool] = None, + ) -> Result[Json]: + """Load JSON data in bulk into ArangoDB. + + Args: + documents (bytes | str): String representation of the JSON data to import. + doc_type (str | None): Determines how the body of the request is interpreted. + Possible values: "", "documents", "array", "auto". + complete (bool | None): If set to `True`, the whole import fails if any error occurs. + Otherwise, the import continues even if some documents are invalid and cannot + be imported, skipping the problematic documents. + details (bool | None): If set to `True`, the result includes a `details` + attribute with information about documents that could not be imported. + from_prefix (str | None): String prefix prepended to the value of "_from" + field in each edge document inserted. For example, prefix "foo" + prepended to "_from": "bar" will result in "_from": "foo/bar". + Applies only to edge collections. + to_prefix (str | None): String prefix prepended to the value of "_to" + field in each edge document inserted. For example, prefix "foo" + prepended to "_to": "bar" will result in "_to": "foo/bar". + Applies only to edge collections. + overwrite (bool | None): If set to `True`, all existing documents are removed + prior to the import. Indexes are still preserved. + overwrite_collection_prefix (bool | None): Force the `fromPrefix` and + `toPrefix`, possibly replacing existing collection name prefixes. + on_duplicate (str | None): Action to take on unique key constraint violations + (for documents with "_key" fields). Allowed values are "error" (do + not import the new documents and count them as errors), "update" + (update the existing documents while preserving any fields missing + in the new ones), "replace" (replace the existing documents with + new ones), and "ignore" (do not import the new documents and count + them as ignored, as opposed to counting them as errors). Options + "update" and "replace" may fail on secondary unique key constraint + violations. + wait_for_sync (bool | None): Block until operation is synchronized to disk. + ignore_missing (bool | None): When importing JSON arrays of tabular data + (type parameter is omitted), the first line of the request body defines + the attribute keys and the subsequent lines the attribute values for each + document. Subsequent lines with a different number of elements than the + first line are not imported by default. You can enable this option to + import them anyway. For the missing elements, the document attributes + are omitted. Excess elements are ignored. + + Returns: + dict: Result of the import operation. + + Raises: + DocumentInsertError: If import fails. + + References: + - `import-json-data-as-documents `__ + """ # noqa: E501 + params: Params = dict() + params["collection"] = self.name + if doc_type is not None: + params["type"] = doc_type + if complete is not None: + params["complete"] = complete + if details is not None: + params["details"] = details + if from_prefix is not None: + params["fromPrefix"] = from_prefix + if to_prefix is not None: + params["toPrefix"] = to_prefix + if overwrite is not None: + params["overwrite"] = overwrite + if overwrite_collection_prefix is not None: + params["overwriteCollectionPrefix"] = overwrite_collection_prefix + if on_duplicate is not None: + params["onDuplicate"] = on_duplicate + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if ignore_missing is not None: + params["ignoreMissing"] = ignore_missing + + def response_handler(resp: Response) -> Json: + if not resp.is_success: + raise DocumentInsertError(resp, request) + result: Json = self.deserializer.loads(resp.raw_body) + return result + + request = Request( + method=Method.POST, + endpoint="/_api/import", + data=documents, + params=params, + ) + + return await self._executor.execute(request, response_handler) + class StandardCollection(Collection[T, U, V]): """Standard collection API wrapper. diff --git a/docs/document.rst b/docs/document.rst index c0764e8..47619db 100644 --- a/docs/document.rst +++ b/docs/document.rst @@ -150,6 +150,39 @@ Standard documents are managed via collection API wrapper: # Delete one or more matching documents. await students.delete_match({"first": "Emma"}) +Importing documents in bulk is faster when using specialized methods. Suppose +our data is in a file containing JSON Lines (JSONL) format. Each line is expected +to be one JSON object. Example of a "students.jsonl" file: + +.. code-block:: json + + {"_key":"john","name":"John Smith","age":35} + {"_key":"katie","name":"Katie Foster","age":28} + +To import this file into the "students" collection, we can use the `import_bulk` API: + +.. code-block:: python + + from arangoasync import ArangoClient + from arangoasync.auth import Auth + import aiofiles + + async with ArangoClient(hosts="http://localhost:8529") as client: + auth = Auth(username="root", password="passwd") + + # Connect to "test" database as root user. + db = await client.db("test", auth=auth) + + # Get the API wrapper for "students" collection. + students = db.collection("students") + + # Read the JSONL file asynchronously. + async with aiofiles.open('students.jsonl', mode='r') as f: + documents = await f.read() + + # Import documents in bulk. + result = await students.import_bulk(documents, doc_type="documents") + You can manage documents via database API wrappers also, but only simple operations (i.e. get, insert, update, replace, delete) are supported and you must provide document IDs instead of keys: diff --git a/tests/test_collection.py b/tests/test_collection.py index fb8d7ba..2dc4c42 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -16,6 +16,7 @@ CollectionStatisticsError, CollectionTruncateError, DocumentCountError, + DocumentInsertError, IndexCreateError, IndexDeleteError, IndexGetError, @@ -263,3 +264,20 @@ async def test_collection_truncate_count(docs, doc_col, bad_col): await doc_col.truncate(wait_for_sync=True, compact=True) cnt = await doc_col.count() assert cnt == 0 + + +@pytest.mark.asyncio +async def test_collection_import_bulk(doc_col, bad_col, docs): + documents = "\n".join(doc_col.serializer.dumps(doc) for doc in docs) + + # Test errors + with pytest.raises(DocumentInsertError): + await bad_col.import_bulk(documents, doc_type="documents") + + # Insert documents in bulk + result = await doc_col.import_bulk(documents, doc_type="documents") + + # Verify the documents were inserted + count = await doc_col.count() + assert count == len(docs) + assert result["created"] == count