Charger un grand nombre de documents à l'aide d'une session de chargement.

Vous pouvez créer une session de chargement dans Paradigm, lui attribuer des documents et suivre la progression du chargement au fil du temps.

Vue d'ensemble


Plusieurs points de terminaison de l'API sont disponibles pour créer et gérer des sessions de chargement sur la plateforme Paradigm.

Le script présenté dans cet article fournit une implémentation asynchrone pour le chargement de plusieurs fichiers. Il utilise les bibliothèques asyncio et aiohttp de Python pour gérer efficacement les chargement de fichiers simultanés.

Avant de commencer les chargements, le script désactive automatiquement toutes les sessions existantes et crée une nouvelle session dédiée.

Vous pouvez également choisir l'endroit où vous souhaitez charger vos documents :
  • Dans votre espace privé
  • Dans l'espace de votre entreprise
  • Dans un espace de travail partagé avec d'autres membres de l'entreprise sélectionnés.

Par défaut, les documents seront chargés dans l'espace de l'entreprise.

Prérequis

  • Python 3.7+
  • Installation des paquets nécessaires : pip install aiohttp tqdm
    aiohttp : pour les requêtes HTTP asynchrones
    tqdm : Pour la visualisation de la barre de progression

Contrôle de la simultanéité

Le script utilise un sémaphore pour limiter le nombre de chargements simultanés, afin d'éviter une surcharge du serveur tout en maintenant un débit efficace. Ajustez le paramètre --batch_size en fonction des conditions de votre réseau et de la capacité de votre serveur.

Arguments de la ligne de commande

python upload_session_async.py
    --api-key="your_api_key » 
    --base-url="http://localhost:8000 »
    --files-dir="/path/to/files »
    --document-location="company »
    --workspace-id=123

Arguments Description

Argument Description Valeur par défaut

--api-key

(required)

 Clé de l'API Paradigm (peut également être définie via la variable d'environnement PARADIGM_API_KEY) Aucune
--base-url  URL de base de l'API Paradigm http://localhost:8000

--files-dir

(required)

 Répertoire contenant les fichiers à charger Aucune
--document-location  Où stocker les documents (choix : private, company, workspace) company
--workspace-id  ID de l'espace de travail pour le stockage de l'espace de travail (obligatoire si document_location est workspace) Aucune
--batch-size  Nombre maximal de chargements simultanés 5

API sur l'état de la session de chargement

Un point de terminaison API dédié est disponible pour surveiller la progression de la session de chargement, vérifier quels documents ont été incorporés avec succès et identifier les éventuels échecs.

Point de terminaison : GET /api/v2/upload-session/{uuid:uuid}

Réponse : Le point de terminaison renvoie des informations sur la session de chargement, notamment son identifiant unique, son état de validité, les dates de création et de dernière mise à jour, ainsi qu'une liste des documents chargés avec leur état (intégrés ou non).

Script de chargement complet

Le script ci-dessous permet un suivi en temps réel et garantit la transparence du flux de traitement des documents :

# /// script
# requires-python = ">=3.7"
# dependencies = [
#   "aiohttp",
#   "tqdm",
# ]
# ///

import argparse
import asyncio
import json
import logging
import os
from pathlib import Path

import aiohttp
from tqdm.asyncio import tqdm

logger = logging.getLogger(__name__)

def configure_logging(log_path=None, clean_logs=False):
    """Configure the logging system"""
    # Basic logger configuration
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)

    # Formatter for the logs
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

    # Handler for the console
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)

    # Handler for the file if a path is specified
    if log_path:
        log_file = Path(log_path)

        # Clean existing logs if requested
        if clean_logs and log_file.exists():
            log_file.unlink()

        # Create parent directory if necessary
        log_file.parent.mkdir(parents=True, exist_ok=True)

        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)
        logger.info(f"Logs will be saved to {log_file.absolute()}")

class AsyncUploadSession:
    def __init__(
        self, base_url: str, api_key: str, document_location: str, workspace_id: int | None = None, batch_size: int = 5
    ):
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.document_location = document_location
        self.workspace_id = workspace_id
        self.batch_size = batch_size

    async def create_session(self, session: aiohttp.ClientSession) -> dict:
        """Create a new upload session"""
        # First, deactivate any existing sessions
        async with session.post(f"{self.base_url}/api/v2/upload-session/deactivate", headers=self.headers) as response:
            await response.json()
            logger.info("Successfully deactivated existing sessions")

        # Then create a new session
        async with session.post(f"{self.base_url}/api/v2/upload-session", headers=self.headers) as response:
            response.raise_for_status()
            session_data = await response.json()
            logger.info("Successfully created a new session")
            return session_data

    async def upload_file(
        self,
        session: aiohttp.ClientSession,
        session_id: str,
        file_path: Path,
        semaphore: asyncio.Semaphore,
    ) -> tuple[Path, dict]:
        """Upload a single file to the session with additional data"""
        # Use the semaphore to limit concurrent calls
        async with semaphore:
            data = aiohttp.FormData()
            data.add_field("description", "File upload")
            data.add_field("session_id", session_id)
            data.add_field("collection_type", self.document_location)

            if self.document_location == "workspace" and self.workspace_id:
                data.add_field("workspace_id", str(self.workspace_id))

            with open(file_path, "rb") as file_content:
                data.add_field("file", file_content, filename=file_path.name, content_type="application/octet-stream")

                async with session.post(
                    f"{self.base_url}/api/v2/upload-session/{session_id}", headers=self.headers, data=data
                ) as response:
                    status = response.status
                    response_body = await response.text()
                    if status >= 400:
                        logger.error(f"Error {status} uploading file {file_path.name}: {response_body}")
                        response.raise_for_status()
                    result = json.loads(response_body)
                    return file_path, result

    async def upload_files(self, files_dir: Path):
        async with aiohttp.ClientSession() as session:
            semaphore = asyncio.Semaphore(self.batch_size)
            logger.info(f"Limiting concurrent uploads to {self.batch_size}")

            session_data = await self.create_session(session)
            session_id = session_data["uuid"]

            # Retrieve the list of files (including in subfolders)
            files = [f for f in files_dir.rglob("*") if f.is_file()]

            tasks = [self.upload_file(session, session_id, file_path, semaphore) for file_path in files]

            async def safe_task(task):
                try:
                    return await task
                except Exception as e:
                    return e

            results = await tqdm.gather(*[safe_task(t) for t in tasks])

            successful_uploads = []
            failed_uploads = []

            for file_path, result in zip(files, results):
                if isinstance(result, Exception):
                    logger.error(f"Failed to upload file: {file_path.name}: {result}")
                    failed_uploads.append(file_path.name)
                else:
                    successful_uploads.append(file_path.name)

            # Display the summary
            logger.info("=" * 50)
            logger.info(f"Upload session summary for {len(files)} files:")
            logger.info(f"✅ Successfully uploaded: {len(successful_uploads)} files")
            if failed_uploads:
                logger.info(f"❌ Failed to upload: {len(failed_uploads)} files")
                logger.info("Failed files:")
                for failed_file in failed_uploads:
                    logger.info(f"  - {failed_file}")
            logger.info("=" * 50)

            return {"total": len(files), "successful": successful_uploads, "failed": failed_uploads}

async def main_async():
    parser = argparse.ArgumentParser(description="Usable script to upload multiple files to Paradigm")
    parser.add_argument("--api-key", default=None, help="Paradigm API key to use.")
    parser.add_argument("--base-url", default="<http://localhost:8000>", help="Base url to use.")
    parser.add_argument("--files-dir", type=Path, required=True, help="Directory containing files to upload")
    parser.add_argument(
        "--document-location",
        type=str,
        choices=["private", "company", "workspace"],
        default="company",
        help="Paradigm space to store the documents into. Defaults to the company space.",
    )
    parser.add_argument(
        "--workspace-id",
        type=int,
        help="The ID of the workspace to store the documents into. "
        "Only applicable if the document location is set to 'workspace'.",
    )
    parser.add_argument(
        "--batch-size",
        type=int,
        default=5,
        help="Maximum number of concurrent uploads to perform",
    )
    parser.add_argument(
        "--log-path",
        type=str,
        default="upload_session.log",
        help="Path where to save logs. If not provided, logs will only be displayed in console.",
    )
    parser.add_argument(
        "--clean-logs",
        action="store_true",
        help="If set, existing log file will be deleted before starting.",
    )
    args = parser.parse_args()

    # Configure the logging system
    configure_logging(args.log_path, args.clean_logs)

    if args.api_key is None:
        api_key = os.getenv("PARADIGM_API_KEY", None)
    else:
        api_key = args.api_key

    assert api_key is not None, "No API key provided."

    # Check if the directory exists
    if not args.files_dir.exists():
        raise FileNotFoundError(f"Directory not found: {args.files_dir}")

    logger.info(f"Using directory: {args.files_dir.absolute()}")
    files = list(args.files_dir.glob("*"))
    logger.info(f"Found files: {[f.name for f in files]}")

    uploader = AsyncUploadSession(args.base_url, api_key, args.document_location, args.workspace_id, args.batch_size)
    await uploader.upload_files(args.files_dir)

def main():
    asyncio.run(main_async())

if __name__ == "__main__":
    main()