Upload a large number of documents with an Upload Session.

You can create an Upload Session in Paradigm, attribute some documents to it and follow the state of the session over time.

Overview

Several API endpoints are available to create and manage Upload Sessions in the Paradigm platform.

The presented script in this article provides an asynchronous implementation for uploading multiple files. It utilizes Python's asyncio and aiohttp libraries to handle concurrent file uploads efficiently.

Before starting any uploads, the script automatically deactivates all existing upload sessions and creates a new dedicated session.

You can choose where to upload your documents:

  • In your private space
  • In your company space
  • In a workspace shared with other selected company members

By default, the documents will be uploaded to the company space.

Prerequisites

  • Python 3.7+
  • Installation of required packages: pip install aiohttp tqdm
    aiohttp: For async HTTP requests
    tqdm: For progress bar visualization

Concurrency Control

The script uses a semaphore to limit the number of concurrent uploads, preventing server overload while maintaining efficient throughput. Adjust the --batch_size parameter based on your network conditions and server capacity.

Command Line Arguments

python upload_session_async.py
--api-key="your_api_key"
--base-url="http://localhost:8000"
--files-dir="/path/to/files"
--document-location="company"
--workspace-id=123

Arguments Description

Argument Description Default value

--api-key

(required)

 Paradigm API key (can also be set via the PARADIGM_API_KEY environment variable) None
--base-url  Base URL of the Paradigm API http://localhost:8000

--files-dir

(required)

 Directory containing files to upload None
--document-location  Where to store documents (choices: private, company, workspace) company
--workspace-id  Workspace ID for workspace storage (required if document_location is workspace) None
--batch-size  Maximum number of concurrent uploads 5

Upload Session Status API

A dedicated API endpoint is available to monitor the progress of the upload session, check which documents have been embedded successfully, and identify any failures.

Endpoint: GET /api/v2/upload-session/{uuid:uuid}

Response: The endpoint returns details about the upload session, including its unique identifier, validity status, timestamps for creation and last update, and a list of uploaded documents with their statuses (embedded or failed).

Full Upload Script

The script below provides real-time tracking and ensures transparency in the document processing workflow:

# /// script
# requires-python = ">=3.7"
# dependencies = [
#   "aiohttp",
#   "tqdm",
# ]
# ///

import argparse
import asyncio
import json
import logging
import os
from pathlib import Path

import aiohttp
from tqdm.asyncio import tqdm

logger = logging.getLogger(__name__)

def configure_logging(log_path=None, clean_logs=False):
    """Configure the logging system"""
    # Basic logger configuration
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)

    # Formatter for the logs
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

    # Handler for the console
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)

    # Handler for the file if a path is specified
    if log_path:
        log_file = Path(log_path)

        # Clean existing logs if requested
        if clean_logs and log_file.exists():
            log_file.unlink()

        # Create parent directory if necessary
        log_file.parent.mkdir(parents=True, exist_ok=True)

        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)
        logger.info(f"Logs will be saved to {log_file.absolute()}")

class AsyncUploadSession:
    def __init__(
        self, base_url: str, api_key: str, document_location: str, workspace_id: int | None = None, batch_size: int = 5
    ):
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.document_location = document_location
        self.workspace_id = workspace_id
        self.batch_size = batch_size

    async def create_session(self, session: aiohttp.ClientSession) -> dict:
        """Create a new upload session"""
        # First, deactivate any existing sessions
        async with session.post(f"{self.base_url}/api/v2/upload-session/deactivate", headers=self.headers) as response:
            await response.json()
            logger.info("Successfully deactivated existing sessions")

        # Then create a new session
        async with session.post(f"{self.base_url}/api/v2/upload-session", headers=self.headers) as response:
            response.raise_for_status()
            session_data = await response.json()
            logger.info("Successfully created a new session")
            return session_data

    async def upload_file(
        self,
        session: aiohttp.ClientSession,
        session_id: str,
        file_path: Path,
        semaphore: asyncio.Semaphore,
    ) -> tuple[Path, dict]:
        """Upload a single file to the session with additional data"""
        # Use the semaphore to limit concurrent calls
        async with semaphore:
            data = aiohttp.FormData()
            data.add_field("description", "File upload")
            data.add_field("session_id", session_id)
            data.add_field("collection_type", self.document_location)

            if self.document_location == "workspace" and self.workspace_id:
                data.add_field("workspace_id", str(self.workspace_id))

            with open(file_path, "rb") as file_content:
                data.add_field("file", file_content, filename=file_path.name, content_type="application/octet-stream")

                async with session.post(
                    f"{self.base_url}/api/v2/upload-session/{session_id}", headers=self.headers, data=data
                ) as response:
                    status = response.status
                    response_body = await response.text()
                    if status >= 400:
                        logger.error(f"Error {status} uploading file {file_path.name}: {response_body}")
                        response.raise_for_status()
                    result = json.loads(response_body)
                    return file_path, result

    async def upload_files(self, files_dir: Path):
        async with aiohttp.ClientSession() as session:
            semaphore = asyncio.Semaphore(self.batch_size)
            logger.info(f"Limiting concurrent uploads to {self.batch_size}")

            session_data = await self.create_session(session)
            session_id = session_data["uuid"]

            # Retrieve the list of files (including in subfolders)
            files = [f for f in files_dir.rglob("*") if f.is_file()]

            tasks = [self.upload_file(session, session_id, file_path, semaphore) for file_path in files]

            async def safe_task(task):
                try:
                    return await task
                except Exception as e:
                    return e

            results = await tqdm.gather(*[safe_task(t) for t in tasks])

            successful_uploads = []
            failed_uploads = []

            for file_path, result in zip(files, results):
                if isinstance(result, Exception):
                    logger.error(f"Failed to upload file: {file_path.name}: {result}")
                    failed_uploads.append(file_path.name)
                else:
                    successful_uploads.append(file_path.name)

            # Display the summary
            logger.info("=" * 50)
            logger.info(f"Upload session summary for {len(files)} files:")
            logger.info(f"✅ Successfully uploaded: {len(successful_uploads)} files")
            if failed_uploads:
                logger.info(f"❌ Failed to upload: {len(failed_uploads)} files")
                logger.info("Failed files:")
                for failed_file in failed_uploads:
                    logger.info(f"  - {failed_file}")
            logger.info("=" * 50)

            return {"total": len(files), "successful": successful_uploads, "failed": failed_uploads}

async def main_async():
    parser = argparse.ArgumentParser(description="Usable script to upload multiple files to Paradigm")
    parser.add_argument("--api-key", default=None, help="Paradigm API key to use.")
    parser.add_argument("--base-url", default="<http://localhost:8000>", help="Base url to use.")
    parser.add_argument("--files-dir", type=Path, required=True, help="Directory containing files to upload")
    parser.add_argument(
        "--document-location",
        type=str,
        choices=["private", "company", "workspace"],
        default="company",
        help="Paradigm space to store the documents into. Defaults to the company space.",
    )
    parser.add_argument(
        "--workspace-id",
        type=int,
        help="The ID of the workspace to store the documents into. "
        "Only applicable if the document location is set to 'workspace'.",
    )
    parser.add_argument(
        "--batch-size",
        type=int,
        default=5,
        help="Maximum number of concurrent uploads to perform",
    )
    parser.add_argument(
        "--log-path",
        type=str,
        default="upload_session.log",
        help="Path where to save logs. If not provided, logs will only be displayed in console.",
    )
    parser.add_argument(
        "--clean-logs",
        action="store_true",
        help="If set, existing log file will be deleted before starting.",
    )
    args = parser.parse_args()

    # Configure the logging system
    configure_logging(args.log_path, args.clean_logs)

    if args.api_key is None:
        api_key = os.getenv("PARADIGM_API_KEY", None)
    else:
        api_key = args.api_key

    assert api_key is not None, "No API key provided."

    # Check if the directory exists
    if not args.files_dir.exists():
        raise FileNotFoundError(f"Directory not found: {args.files_dir}")

    logger.info(f"Using directory: {args.files_dir.absolute()}")
    files = list(args.files_dir.glob("*"))
    logger.info(f"Found files: {[f.name for f in files]}")

    uploader = AsyncUploadSession(args.base_url, api_key, args.document_location, args.workspace_id, args.batch_size)
    await uploader.upload_files(args.files_dir)

def main():
    asyncio.run(main_async())

if __name__ == "__main__":
    main()