Ajouter un grand nombre de documents

Voici un script standard pour télécharger un grand nombre de documents dans Paradigm.

Le nombre de documents téléchargés par minute est limité à 6.

Un exemple de commande pour lancer le script :

python3 script.py /path/to/files/folder --api_key <my_api_key>
 

Le script ci-dessous affiche une barre de progression et envoie les fichiers en lots parallèles d'une taille donnée (3 par défaut, si vous rencontrez des problèmes de restrictions, essayez de réduire la taille des lots).

Vous pouvez choisir où télécharger vos documents : dans votre espace privé, dans l'espace de votre entreprise ou dans un espace de travail partagé avec un nombre réduit de membres de votre entreprise (workspace).

Par défaut, les documents seront téléchargés dans l'espace de l'entreprise.

Si vous souhaitez télécharger des documents dans un espace de travail (workspace), vous devez également spécifier l'identifiant de votre espace de travail (qui peut être récupéré à partir du panneau d'administration).

Options

uploadPARADIGM.py [-h] [--log_path LOG_PATH] [--error_path ERROR_PATH]
                         [--clean_log_files] [--api_key API_KEY]
                         [--base_url BASE_URL] [--batch_size BATCH_SIZE]
                         [--document_location {private,company,workspace}]
                         [--workspace_id WORKSPACE_ID]
                         folder_path

positional arguments:
  folder_path           Path to the folder containing the documents to upload.

options:
  -h, --help            show this help message and exit
  --log_path LOG_PATH   Path to the JSON file to log file upload.
  --error_path ERROR_PATH    Path to the JSON file to log failed uploads.
  --clean_log_files     Clear the log files before starting.
  --api_key API_KEY     Paradigm API key to use.
  --base_url BASE_URL   Base url to use.
  --batch_size BATCH_SIZE Number of parallel processes to run
  --document_location {private,company,workspace} Paradigm space to store the documents into. Defaults to the company space.
  --workspace_id WORKSPACE_ID  The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'.

Script complet

import dataclasses
from dataclasses import dataclass
from typing import Union, List, Literal
from pathlib import Path
import os
import asyncio
from tqdm.asyncio import tqdm
import json

from openai import AsyncOpenAI
from openai.types import FileObject

SUPPORTED_FILE_FORMATS = ["PDF", "pdf", "docx", "DOCX", "doc", "DOC", "md", "MD"]

ONGOING_STATUS = ["pending", "parsing", "embedding", "updating"]
SUCCESSFUL_STATUS = ["embedded", "success"]
FAILED_STATUS = ["parsing_failed", "embedding_failed", "fail", "failed"]


@dataclass
class UploadResponse:
    id: Union[str, None]
    bytes: Union[int, None]
    file_path: Path
    relative_path: Path
    filename: Union[str, None]
    status: str
    fail_reason: Union[str, None]


@dataclass
class LogFileElement:
    id: str
    filename: str
    relative_path: str
    status: str
    fail_reason: Union[str, None]


class Utility:
    @staticmethod
    async def get_files(client: AsyncOpenAI) -> List[FileObject]:
        files = []
        page_number = 1
        while True:
            list_response = await client.files.list(extra_query={"page": page_number})
            files += list_response.data
            if not list_response.has_next_page():
                break
            page_number += 1
        return files

    @staticmethod
    async def get_file(client: AsyncOpenAI, file_id: str) -> Union[FileObject, None]:
        page_number = 1
        while True:
            list_response = await client.files.list(extra_query={"page": page_number})
            for file in list_response.data:
                if file.id == file_id:
                    return file
            if not list_response.has_next_page():
                break
            page_number += 1
        return None


class FileUploader:
    def __init__(
            self, folder_path: Path, base_url: str, api_key: str,
            log_path: Path, error_log_path: Path, clean_logs: bool,
            batch_size: int, document_location: str, workspace_id: Union[int, None] = None
    ):
        self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
        self.folder_path = folder_path
        self.log_path = log_path
        self.error_log_path = error_log_path
        self.clean_logs = clean_logs
        self.batch_size = batch_size
        self.document_location = document_location
        self.workspace_id = workspace_id

    def _load_uploaded_files(self) -> List[LogFileElement]:
        data = []
        if self.log_path.exists():
            with open(self.log_path, "r") as f:
                data = [LogFileElement(**json.loads(line)) for line in f]
        return data

    def _load_failed_files(self) -> List[LogFileElement]:
        data = []
        if self.error_log_path.exists():
            with open(self.error_log_path, "r") as f:
                data = [LogFileElement(**json.loads(line)) for line in f]
        return data

    def _log_file(self, file_info: UploadResponse, mode: Literal["info", "error"]):
        data_to_log = LogFileElement(
            id=file_info.id,
            filename=file_info.filename,
            relative_path=str(file_info.relative_path),
            status=file_info.status,
            fail_reason=file_info.fail_reason
        )
        file_to_use = self.log_path if mode == "info" else self.error_log_path
        with open(file_to_use, "a") as f:
            json.dump(dataclasses.asdict(data_to_log), f, ensure_ascii=False)
            f.write("\n")

    async def _file_is_processed(self, file_id: str, retries: int, delay: float) -> (bool, str):
        for i in range(retries):
            file = await Utility.get_file(self.client, file_id)
            if file is not None:
                if file.status in FAILED_STATUS:
                    return False, file.status
                if i == retries - 1:
                    return file.status in SUCCESSFUL_STATUS, file.status
            await asyncio.sleep(delay)

    async def _upload_file(self, semaphore: asyncio.Semaphore, file_path: Path):
        async with semaphore:
            log_info = UploadResponse(
                id=None,
                bytes=None,
                file_path=file_path,
                relative_path=file_path.relative_to(self.folder_path),
                filename=None,
                status="failed",
                fail_reason=None
            )
            try:
                if os.path.getsize(file_path) > 0:
                    extra_body = {"collection_type": self.document_location}
                    if self.document_location == "workspace":
                        assert self.workspace_id is not None
                        extra_body["workspace_id"] = self.workspace_id
                    with open(file_path, "rb") as f:
                        response = await self.client.files.create(
                            file=f, purpose="documents", extra_body=extra_body
                        )
                    log_info.id = response.id
                    log_info.bytes = response.bytes
                    log_info.filename = response.filename
                    log_info.status = response.status

                  _, status = await self._file_is_processed(response.id, retries=10, delay=2)
                    log_info.status = status
                else:
                    log_info.bytes = 0
                    log_info.fail_reason = "Empty file detected"
            except Exception as e:
                log_info.fail_reason = str(e)

            if log_info.status in SUCCESSFUL_STATUS:
                self._log_file(log_info, "info")
            else:
                self._log_file(log_info, "error")

    async def upload_files(self):
        if self.clean_logs:
            self.log_path.unlink(missing_ok=True)
            self.error_log_path.unlink(missing_ok=True)

        semaphore = asyncio.Semaphore(self.batch_size)

        uploaded_files = self._load_uploaded_files()
        uploaded_file_paths = [file.relative_path for file in uploaded_files]
        failed_files = self._load_failed_files()
        # If no ID has been attributed to a file, then an unexpected error has happened
        failed_files_paths = [file.relative_path for file in failed_files if file.id is not None]
        unexpected_error_file_paths = [file.relative_path for file in failed_files if file.id is None]

        print(f"Detected uploaded files: {len(uploaded_file_paths)}")
        print(f"Detected failed files: {len(failed_files_paths)}")
        print(f"Detected unexpected error files: {len(unexpected_error_file_paths)}")

        files_to_upload = []
        for file_format in SUPPORTED_FILE_FORMATS:
            for file in self.folder_path.rglob(f'*.{file_format}'):
                relative_path = str(file.relative_to(self.folder_path))
                if relative_path in uploaded_file_paths:
                    continue
                elif relative_path in failed_files_paths:
                    continue
                elif relative_path in unexpected_error_file_paths:
                    files_to_upload.append(file)
                else:
                    files_to_upload.append(file)

        print(f"Detected files to upload: {len(files_to_upload)}")

        with tqdm(total=len(files_to_upload)) as pb:
            tasks = [
                self._upload_file(semaphore, file) for file in files_to_upload
            ]
            for task in asyncio.as_completed(tasks):
                await task
                pb.update(1)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(
        description="Usable script to upload multiple files to Paradigm"
    )
    parser.add_argument(
        "folder_path", type=Path,
        help="Path to the folder containing the documents to upload."
    )
    parser.add_argument(
        "--log_path", default=Path.cwd() / Path("log_files.jsonl"), type=Path,
        help="Path to the JSON file to log file upload."
    )
    parser.add_argument(
        "--error_path", default=Path.cwd() / Path("error_files.jsonl"), type=Path,
        help="Path to the JSON file to log failed uploads."
    )
    parser.add_argument(
        "--clean_log_files", action="store_true",
        help="Clear the log files before starting."
    )
    parser.add_argument(
        "--api_key", default=None,
        help="Paradigm API key to use."
    )
    parser.add_argument(
        "--base_url", default="https://paradigm.lighton.ai/api/v2",
        help="Base url to use."
    )
    parser.add_argument(
        "--batch_size", default=3, type=int,
        help="Number of parallel processes to run"
    )
    parser.add_argument(
        "--document_location", type=str, choices=["private", "company", "workspace"], default="company",
        help="Paradigm space to store the documents into. Defaults to the company space."
    )
    parser.add_argument(
        "--workspace_id", type=int,
        help="The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'."
    )
    args = parser.parse_args()

    if args.api_key is None:
        api_key = os.getenv("PARADIGM_API_KEY", None)
    else:
        api_key = args.api_key

    assert api_key is not None, "No API key provided."

    # Initialize the FileUploader instance
    file_uploader = FileUploader(
        folder_path=args.folder_path,
        base_url=args.base_url,
        api_key=api_key,
        log_path=args.log_path,
        error_log_path=args.error_path,
        clean_logs=args.clean_log_files,
        batch_size=args.batch_size,
        document_location=args.document_location,
        workspace_id=args.workspace_id
    )

    # Run the upload_files method
    asyncio.run(file_uploader.upload_files())