Skip to content

Adding import API #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions arangoasync/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,108 @@ def response_handler(

return await self._executor.execute(request, response_handler)

async def import_bulk(
self,
documents: bytes | str,
doc_type: Optional[str] = None,
complete: Optional[bool] = True,
details: Optional[bool] = True,
from_prefix: Optional[str] = None,
to_prefix: Optional[str] = None,
overwrite: Optional[bool] = None,
overwrite_collection_prefix: Optional[bool] = None,
on_duplicate: Optional[str] = None,
wait_for_sync: Optional[bool] = None,
ignore_missing: Optional[bool] = None,
) -> Result[Json]:
"""Load JSON data in bulk into ArangoDB.

Args:
documents (bytes | str): String representation of the JSON data to import.
doc_type (str | None): Determines how the body of the request is interpreted.
Possible values: "", "documents", "array", "auto".
complete (bool | None): If set to `True`, the whole import fails if any error occurs.
Otherwise, the import continues even if some documents are invalid and cannot
be imported, skipping the problematic documents.
details (bool | None): If set to `True`, the result includes a `details`
attribute with information about documents that could not be imported.
from_prefix (str | None): String prefix prepended to the value of "_from"
field in each edge document inserted. For example, prefix "foo"
prepended to "_from": "bar" will result in "_from": "foo/bar".
Applies only to edge collections.
to_prefix (str | None): String prefix prepended to the value of "_to"
field in each edge document inserted. For example, prefix "foo"
prepended to "_to": "bar" will result in "_to": "foo/bar".
Applies only to edge collections.
overwrite (bool | None): If set to `True`, all existing documents are removed
prior to the import. Indexes are still preserved.
overwrite_collection_prefix (bool | None): Force the `fromPrefix` and
`toPrefix`, possibly replacing existing collection name prefixes.
on_duplicate (str | None): Action to take on unique key constraint violations
(for documents with "_key" fields). Allowed values are "error" (do
not import the new documents and count them as errors), "update"
(update the existing documents while preserving any fields missing
in the new ones), "replace" (replace the existing documents with
new ones), and "ignore" (do not import the new documents and count
them as ignored, as opposed to counting them as errors). Options
"update" and "replace" may fail on secondary unique key constraint
violations.
wait_for_sync (bool | None): Block until operation is synchronized to disk.
ignore_missing (bool | None): When importing JSON arrays of tabular data
(type parameter is omitted), the first line of the request body defines
the attribute keys and the subsequent lines the attribute values for each
document. Subsequent lines with a different number of elements than the
first line are not imported by default. You can enable this option to
import them anyway. For the missing elements, the document attributes
are omitted. Excess elements are ignored.

Returns:
dict: Result of the import operation.

Raises:
DocumentInsertError: If import fails.

References:
- `import-json-data-as-documents <https://docs.arangodb.com/stable/develop/http-api/import/#import-json-data-as-documents>`__
""" # noqa: E501
params: Params = dict()
params["collection"] = self.name
if doc_type is not None:
params["type"] = doc_type
if complete is not None:
params["complete"] = complete
if details is not None:
params["details"] = details
if from_prefix is not None:
params["fromPrefix"] = from_prefix
if to_prefix is not None:
params["toPrefix"] = to_prefix
if overwrite is not None:
params["overwrite"] = overwrite
if overwrite_collection_prefix is not None:
params["overwriteCollectionPrefix"] = overwrite_collection_prefix
if on_duplicate is not None:
params["onDuplicate"] = on_duplicate
if wait_for_sync is not None:
params["waitForSync"] = wait_for_sync
if ignore_missing is not None:
params["ignoreMissing"] = ignore_missing

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise DocumentInsertError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)
return result

request = Request(
method=Method.POST,
endpoint="/_api/import",
data=documents,
params=params,
)

return await self._executor.execute(request, response_handler)


class StandardCollection(Collection[T, U, V]):
"""Standard collection API wrapper.
Expand Down
33 changes: 33 additions & 0 deletions docs/document.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,39 @@ Standard documents are managed via collection API wrapper:
# Delete one or more matching documents.
await students.delete_match({"first": "Emma"})

Importing documents in bulk is faster when using specialized methods. Suppose
our data is in a file containing JSON Lines (JSONL) format. Each line is expected
to be one JSON object. Example of a "students.jsonl" file:

.. code-block:: json

{"_key":"john","name":"John Smith","age":35}
{"_key":"katie","name":"Katie Foster","age":28}

To import this file into the "students" collection, we can use the `import_bulk` API:

.. code-block:: python

from arangoasync import ArangoClient
from arangoasync.auth import Auth
import aiofiles

async with ArangoClient(hosts="http://localhost:8529") as client:
auth = Auth(username="root", password="passwd")

# Connect to "test" database as root user.
db = await client.db("test", auth=auth)

# Get the API wrapper for "students" collection.
students = db.collection("students")

# Read the JSONL file asynchronously.
async with aiofiles.open('students.jsonl', mode='r') as f:
documents = await f.read()

# Import documents in bulk.
result = await students.import_bulk(documents, doc_type="documents")

You can manage documents via database API wrappers also, but only simple
operations (i.e. get, insert, update, replace, delete) are supported and you
must provide document IDs instead of keys:
Expand Down
18 changes: 18 additions & 0 deletions tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
CollectionStatisticsError,
CollectionTruncateError,
DocumentCountError,
DocumentInsertError,
IndexCreateError,
IndexDeleteError,
IndexGetError,
Expand Down Expand Up @@ -263,3 +264,20 @@ async def test_collection_truncate_count(docs, doc_col, bad_col):
await doc_col.truncate(wait_for_sync=True, compact=True)
cnt = await doc_col.count()
assert cnt == 0


@pytest.mark.asyncio
async def test_collection_import_bulk(doc_col, bad_col, docs):
documents = "\n".join(doc_col.serializer.dumps(doc) for doc in docs)

# Test errors
with pytest.raises(DocumentInsertError):
await bad_col.import_bulk(documents, doc_type="documents")

# Insert documents in bulk
result = await doc_col.import_bulk(documents, doc_type="documents")

# Verify the documents were inserted
count = await doc_col.count()
assert count == len(docs)
assert result["created"] == count
Loading