Source code for mlclient.clients.documents_client

"""The ML Documents Client module.

It exports high-level class to perform CRUD operations in a MarkLogic server:
    * DocumentsClient
        An MLResourceClient calling /v1/documents endpoint.
"""

from __future__ import annotations

from typing import Any, Iterator

from requests import Response

from mlclient import constants
from mlclient.calls import DocumentsDeleteCall, DocumentsGetCall, DocumentsPostCall
from mlclient.clients import MLResourceClient
from mlclient.exceptions import MarkLogicError
from mlclient.mimetypes import Mimetypes
from mlclient.ml_response_parser import MLResponseParser
from mlclient.structures import (
    Document,
    DocumentFactory,
    Metadata,
    MetadataDocument,
    RawDocument,
    RawStringDocument,
)
from mlclient.structures.calls import (
    Category,
    ContentDispositionSerializer,
    DocumentsBodyPart,
    DocumentsContentDisposition,
)


[docs]class DocumentsClient(MLResourceClient): """An MLResourceClient calling /v1/documents endpoint. It is a high-level class performing CRUD operations in a MarkLogic server. """
[docs] def create( self, data: Document | Metadata | list[Document | Metadata], database: str | None = None, temporal_collection: str | None = None, ) -> dict: """Create or update document(s) content or metadata in a MarkLogic database. Parameters ---------- data : Document | Metadata | list[Document | Metadata] One or more document or default metadata. database : str | None, default None Perform this operation on the named content database instead of the default content database associated with the REST API instance. temporal_collection : str | None, default None Specify the name of a temporal collection into which the documents are to be inserted. Returns ------- dict An origin response from a MarkLogic server. Raises ------ MarkLogicError If MarkLogic returns an error """ body_parts = DocumentsSender.parse(data) call = self._post_call( body_parts=body_parts, database=database, temporal_collection=temporal_collection, ) resp = self.call(call) if not resp.ok: resp_body = MLResponseParser.parse(resp) raise MarkLogicError(resp_body["errorResponse"]) return MLResponseParser.parse(resp)
[docs] def read( self, uris: str | list[str] | tuple[str] | set[str], category: str | list | None = None, database: str | None = None, output_type: type | None = None, ) -> Document | list[Document]: """Return document(s) content or metadata from a MarkLogic database. When uris is a string it returns a single Document instance. Otherwise, result is a list. Parameters ---------- uris : str | list[str] | tuple[str] | set[str] One or more URIs for documents in the database. category : str | list | None, default None The category of data to fetch about the requested document. Category can be specified multiple times to retrieve any combination of content and metadata. Valid categories: content (default), metadata, metadata-values, collections, permissions, properties, and quality. Use metadata to request all categories except content. database : str | None, default None Perform this operation on the named content database instead of the default content database associated with the REST API instance. output_type : type | None, default None A raw output type (supported: str, bytes) Returns ------- Document | list[Document] One or more documents from the database. Raises ------ MarkLogicError If MarkLogic returns an error """ call = self._get_call(uris=uris, category=category, database=database) resp = self.call(call) if not resp.ok: resp_body = MLResponseParser.parse(resp) raise MarkLogicError(resp_body["errorResponse"]) return DocumentsReader.parse(resp, uris, category, output_type)
[docs] def delete( self, uris: str | list[str] | tuple[str] | set[str], category: str | list | None = None, database: str | None = None, temporal_collection: str | None = None, wipe_temporal: bool | None = None, ): """Delete document(s) content or metadata in a MarkLogic database. Parameters ---------- uris : str | list[str] | tuple[str] | set[str] The URI of a document to delete or for which to remove metadata. You can specify multiple documents. category : str | list | None, default None The category of data to remove/reset. Category may be specified multiple times to remove or reset any combination of content and metadata. Valid categories: content (default), metadata, metadata-values, collections, permissions, properties, and quality. Use metadata to reset all metadata. database : str | None, default None Perform this operation on the named content database instead of the default content database associated with the REST API instance. temporal_collection : str | None, default None Specify the name of a temporal collection that contains the document(s) to be deleted. Applies to all documents when deleting more than one. wipe_temporal : bool | None, default None Remove all versions of a temporal document rather than performing a temporal delete. You can only use this parameter when you also specify a temporal-collection parameter. Raises ------ MarkLogicError If MarkLogic returns an error """ call = self._delete_call( uris=uris, category=category, database=database, temporal_collection=temporal_collection, wipe_temporal=wipe_temporal, ) resp = self.call(call) if not resp.ok: resp_body = MLResponseParser.parse(resp) raise MarkLogicError(resp_body["errorResponse"])
@classmethod def _post_call( cls, body_parts: list[DocumentsBodyPart], database: str | None, temporal_collection: str | None, ) -> DocumentsPostCall: """Prepare a DocumentsPostCall instance. It initializes an DocumentsPostCall instance with adjusted parameters. Parameters ---------- body_parts : list[DocumentsBodyPart] A list of multipart request body parts database : str | None Perform this operation on the named content database instead of the default content database associated with the REST API instance. temporal_collection : str | None Specify the name of a temporal collection into which the documents are to be inserted. Returns ------- DocumentsPostCall A prepared DocumentsPostCall instance """ return DocumentsPostCall( body_parts=body_parts, database=database, temporal_collection=temporal_collection, ) @classmethod def _get_call( cls, uris: str | list[str] | tuple[str] | set[str], category: str | list | None, database: str | None, ) -> DocumentsGetCall: """Prepare a DocumentsGetCall instance. It initializes an DocumentsGetCall instance with adjusted parameters. When the category param contains any metadata category, format is set to json. Parameters ---------- uris : str | list[str] | tuple[str] | set[str] One or more URIs for documents in the database. category : str | list | None The category of data to fetch about the requested document. Category can be specified multiple times to retrieve any combination of content and metadata. Valid categories: content (default), metadata, metadata-values, collections, permissions, properties, and quality. Use metadata to request all categories except content. database : str | None Perform this operation on the named content database instead of the default content database associated with the REST API instance. Returns ------- DocumentsGetCall A prepared DocumentsGetCall instance """ params = { "uri": uris, "category": category, "database": database, "data_format": "json", } return DocumentsGetCall(**params) @classmethod def _delete_call( cls, uris: str | list[str] | tuple[str] | set[str], category: str | list | None, database: str | None, temporal_collection: str | None, wipe_temporal: bool | None, ) -> DocumentsDeleteCall: """Prepare a DocumentsDeleteCall instance. Parameters ---------- uris : str | list[str] | tuple[str] | set[str] The URI of a document to delete or for which to remove metadata. You can specify multiple documents. category : str | list | None The category of data to fetch about the requested document. Category can be specified multiple times to retrieve any combination of content and metadata. Valid categories: content (default), metadata, metadata-values, collections, permissions, properties, and quality. Use metadata to request all categories except content. database : str | None Perform this operation on the named content database instead of the default content database associated with the REST API instance. temporal_collection : str | None Specify the name of a temporal collection that contains the document(s) to be deleted. Applies to all documents when deleting more than one. wipe_temporal : bool | None Remove all versions of a temporal document rather than performing a temporal delete. You can only use this parameter when you also specify a temporal-collection parameter. Returns ------- DocumentsDeleteCall A prepared DocumentsDeleteCall instance """ return DocumentsDeleteCall( uri=uris, category=category, database=database, temporal_collection=temporal_collection, wipe_temporal=wipe_temporal, )
class DocumentsSender: """A class parsing Document or Metadata instance(s) to DocumentsBodyPart's list.""" @classmethod def parse( cls, data: Document | Metadata | list[Document | Metadata], ) -> list[DocumentsBodyPart]: """Parse Document or Metadata instance(s) to DocumentsBodyPart's list. Parameters ---------- data : Document | Metadata | list[Document | Metadata] One or more document or default metadata. Returns ------- list[DocumentsBodyPart] A list of multipart /v1/documents request body parts """ if not isinstance(data, list): data = [data] body_parts = [] for data_unit in data: if type(data_unit) not in (Metadata, MetadataDocument): if data_unit.metadata is not None: new_parts = [ cls._get_doc_metadata_body_part(data_unit), cls._get_doc_content_body_part(data_unit), ] else: new_parts = [cls._get_doc_content_body_part(data_unit)] elif type(data_unit) is not Metadata: new_parts = [cls._get_doc_metadata_body_part(data_unit)] else: new_parts = [cls._get_default_metadata_body_part(data_unit)] body_parts.extend(new_parts) return body_parts @classmethod def _get_doc_content_body_part( cls, document: Document, ) -> DocumentsBodyPart: """Instantiate DocumentsBodyPart with Document's content. Parameters ---------- document : Document A document to build a request body part. Returns ------- DocumentsBodyPart A multipart /v1/documents request body part """ return DocumentsBodyPart( **{ "content-type": Mimetypes.get_mimetype(document.uri), "content-disposition": { "body_part_type": "attachment", "filename": document.uri, "format": document.doc_type, }, "content": document.content_bytes, }, ) @classmethod def _get_doc_metadata_body_part( cls, document: Document, ) -> DocumentsBodyPart: """Instantiate DocumentsBodyPart with Document's metadata. Parameters ---------- document : Document A document to build a request body part. Returns ------- DocumentsBodyPart A multipart /v1/documents request body part """ metadata = document.metadata if type(document) not in (RawDocument, RawStringDocument): metadata = metadata.to_json_string() return DocumentsBodyPart( **{ "content-type": constants.HEADER_JSON, "content-disposition": { "body_part_type": "attachment", "filename": document.uri, "category": "metadata", }, "content": metadata, }, ) @classmethod def _get_default_metadata_body_part( cls, metadata: Metadata, ) -> DocumentsBodyPart: """Instantiate DocumentsBodyPart with default metadata. Parameters ---------- metadata : Metadata Metadata to build a request body part. Returns ------- DocumentsBodyPart A multipart /v1/documents request body part """ metadata = metadata.to_json_string() return DocumentsBodyPart( **{ "content-type": constants.HEADER_JSON, "content-disposition": { "body_part_type": "inline", "category": "metadata", }, "content": metadata, }, ) class DocumentsReader: """A class parsing raw MarkLogic response to Document instance(s).""" @classmethod def parse( cls, resp: Response, uris: str | list[str] | tuple[str] | set[str], category: str | list | None, output_type: type | None = None, ) -> Document | list[Document]: """Parse a MarkLogic response to Documents. Parameters ---------- resp : Response A MarkLogic Server response uris : str | list[str] | tuple[str] | set[str] One or more URIs for documents in the database. category : str | list | None The category of data to fetch about the requested document. Category can be specified multiple times to retrieve any combination of content and metadata. Valid categories: content (default), metadata, metadata-values, collections, permissions, properties, and quality. Use metadata to request all categories except content. output_type : type | None, default None A raw output type (supported: str, bytes) Returns ------- Document | list[Document] A single Document instance or their list depending on uris type. """ parsed_resp = cls._parse_response(resp, output_type) content_type = resp.headers.get(constants.HEADER_NAME_CONTENT_TYPE) is_multipart = content_type.startswith(constants.HEADER_MULTIPART_MIXED) documents_data = cls._pre_format_data(parsed_resp, is_multipart, uris, category) docs = cls._parse_to_documents(documents_data, output_type) if isinstance(uris, str): return docs[0] return docs @classmethod def _parse_response( cls, resp: Response, output_type: type | None, ) -> list[tuple]: """Parse a response from a MarkLogic server. Parameters ---------- resp : Response A MarkLogic Server response output_type : type | None, default None A raw output type (supported: str, bytes) Returns ------- list[tuple] A parsed response parts with headers """ parsed_resp = MLResponseParser.parse_with_headers(resp, output_type) if not isinstance(parsed_resp, list): headers, _ = parsed_resp if headers.get(constants.HEADER_NAME_CONTENT_LENGTH) == "0": return [] return [parsed_resp] return parsed_resp @classmethod def _pre_format_data( cls, parsed_resp: list[tuple], is_multipart: bool, uris: str | list[str] | tuple[str] | set[str], category: str | list | None, ) -> Iterator[dict]: """Prepare data to initialize Document instances. Parameters ---------- parsed_resp : list[tuple] A parsed MarkLogic response parts with headers is_multipart : bool A flag informing whether the response is multipart/mixed or not uris : str | list[str] | tuple[str] | set[str] One or more URIs for documents in the database. category : str | list | None The category of data to fetch about the requested document. Category can be specified multiple times to retrieve any combination of content and metadata. Valid categories: content (default), metadata, metadata-values, collections, permissions, properties, and quality. Use metadata to request all categories except content. Returns ------- Iterator[dict] An iterator of pre-formatted data in form of dictionaries """ if is_multipart: return cls._pre_format_documents(parsed_resp, category) return cls._pre_format_document(parsed_resp, uris, category) @classmethod def _pre_format_documents( cls, parsed_resp: list[tuple], origin_category: str | list | None, ) -> Iterator[dict]: """Prepare document parts to initialize Document instances. Parameters ---------- parsed_resp : list[tuple] A parsed MarkLogic response parts with headers origin_category : str | list | None Categories provided by the user Returns ------- Iterator[dict] An iterator of pre-formatted data in form of dictionaries """ expect_content, expect_metadata = cls._expect_categories(origin_category) pre_formatted_data = {} for headers, parse_resp_body in parsed_resp: raw_content_disp = headers.get(constants.HEADER_NAME_CONTENT_DISP) content_disp = ContentDispositionSerializer.serialize(raw_content_disp) partial_data = cls._get_partial_data(content_disp, parse_resp_body) if not (expect_content and expect_metadata): yield partial_data elif content_disp.filename not in pre_formatted_data: pre_formatted_data[content_disp.filename] = partial_data else: data = pre_formatted_data[content_disp.filename] if content_disp.category == Category.CONTENT: data.update(partial_data) yield data else: partial_data.update(data) yield partial_data @classmethod def _pre_format_document( cls, parsed_resp: list[tuple], origin_uris: str | list[str] | tuple[str] | set[str], origin_category: str | list | None, ) -> Iterator[dict]: """Prepare a single-part document to initialize Document instances. Parameters ---------- parsed_resp : list[tuple] A parsed MarkLogic response parts with headers origin_uris Uris provided by the user origin_category : str | list | None Categories provided by the user Returns ------- Iterator[dict] An iterator of pre-formatted data in form of dictionaries """ headers, parsed_resp_body = parsed_resp[0] uri = origin_uris[0] if isinstance(origin_uris, list) else origin_uris expect_content, _ = cls._expect_categories(origin_category) if expect_content: yield { "uri": uri, "format": headers.get(constants.HEADER_NAME_ML_DOCUMENT_FORMAT), "content": parsed_resp_body, } else: yield { "uri": uri, "metadata": cls._pre_format_metadata(parsed_resp_body), } @classmethod def _pre_format_metadata( cls, raw_metadata: dict | bytes | str, ) -> dict | bytes | str: """Prepare raw metadata from a MarkLogic server response. For the dict type it replaces metadataValues key to metadata_values (if exists). Parameters ---------- raw_metadata : dict | bytes | str A raw metadata returned by a MarkLogic server Returns ------- dict | bytes | str Metadata prepared to a Document instantiation. """ if isinstance(raw_metadata, dict) and "metadataValues" in raw_metadata: raw_metadata["metadata_values"] = raw_metadata["metadataValues"] del raw_metadata["metadataValues"] return raw_metadata @classmethod def _expect_categories( cls, origin_category: str | list | None, ) -> tuple[bool, bool]: """Return expectation flags based on categories sent by a user. Parameters ---------- origin_category : str | list | None Categories provided by the user Returns ------- tuple[bool, bool] Expectation flags informing whether data should contain content and/or metadata. """ expect_content = ( not origin_category or Category.CONTENT.value in origin_category ) expect_metadata = origin_category and any( cat.value in origin_category for cat in Category if cat != cat.CONTENT ) return expect_content, expect_metadata @classmethod def _get_partial_data( cls, content_disp: DocumentsContentDisposition, parsed_resp_body: Any, ) -> dict: """Return pre-formatted partial data. Parameters ---------- content_disp : DocumentsContentDisposition A content disposition of a response part parsed_resp_body : Any A parsed response part Returns ------- dict Pre-formatted data in form of a dictionary """ if content_disp.category == Category.CONTENT: return { "uri": content_disp.filename, "format": content_disp.format_, "content": parsed_resp_body, } return { "uri": content_disp.filename, "metadata": cls._pre_format_metadata(parsed_resp_body), } @classmethod def _parse_to_documents( cls, documents_data: Iterator[dict], output_type: type | None, ) -> list[Document]: """Parse pre-formatted data to a list of Document instances. Parameters ---------- documents_data : Iterator[dict] An iterator of pre-formatted data in form of dictionaries output_type : type | None A raw output type (supported: str, bytes) Returns ------- list[Document] A list of parsed Document instances """ return [ cls._parse_to_document(document_data, output_type) for document_data in documents_data ] @classmethod def _parse_to_document( cls, document_data: dict, output_type: type | None, ) -> Document: """Parse pre-formatted data to a Document instance. Parameters ---------- document_data : dict Pre-formatted data in form of a dictionary output_type : type | None A raw output type (supported: str, bytes) Returns ------- Document A parsed Document instance """ uri = document_data.get("uri") doc_format = document_data.get("format") content = document_data.get("content") metadata = document_data.get("metadata") if output_type in (bytes, str): factory_function = DocumentFactory.build_raw_document else: metadata = Metadata(**metadata) if metadata else metadata factory_function = DocumentFactory.build_document return factory_function( content=content, doc_type=doc_format, uri=uri, metadata=metadata, )