Voici un script standard pour télécharger un grand nombre de documents dans Paradigm.
Le nombre de documents téléchargés par minute est limité à 6.
Un exemple de commande pour lancer le script :
python3 upload_paradigm.py /path/to/files/folder --api_key <my_api_key>
Le script ci-dessous affiche une barre de progression et envoie les fichiers en lots parallèles d'une taille donnée (3 par défaut, si vous rencontrez des problèmes de restrictions, essayez de réduire la taille des lots).
Vous pouvez choisir où télécharger vos documents : dans votre espace privé, dans l'espace de votre entreprise ou dans un espace de travail partagé avec un nombre réduit de membres de votre entreprise (workspace).
Par défaut, les documents seront téléchargés dans l'espace de l'entreprise.
Si vous souhaitez télécharger des documents dans un espace de travail (workspace), vous devez également spécifier l'identifiant de votre espace de travail (qui peut être récupéré à partir du panneau d'administration).
Options
usage: upload_paradigm.py [-h] [--log_path LOG_PATH] [--error_path ERROR_PATH]
[--batch_size BATCH_SIZE] [--upload_timeout UPLOAD_TIMEOUT]
[--document_location {private,company,workspace}]
[--workspace_id WORKSPACE_ID]
folder_path
positional arguments:
folder_path Path to the folder containing the documents to upload.
options:
-h, --help show this help message and exit
--log_path LOG_PATH Path to the JSON file to log file upload.
--error_path ERROR_PATH
Path to the JSON file to log failed uploads.
--clean_logs Clear log files before starting.
--api_key API_KEY Paradigm API key to use.
--base_url BASE_URL Base url to use.
--batch_size BATCH_SIZE
Number of parallel processes to run
--upload_timeout UPLOAD_TIMEOUT
Timeout in seconds for a file to be uploaded
--document_location {private,company,workspace}
Paradigm space to store the documents into. Defaults to the company space.
--workspace_id WORKSPACE_ID
The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'
Script complet
import dataclasses
import json
import os
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Union, Tuple
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI
SUPPORTED_FILE_FORMATS = {"pdf", "docx", "doc", "md"}
ONGOING_STATUS = {"pending", "parsing", "embedding", "updating"}
SUCCESSFUL_STATUS = {"embedded", "success"}
FAILED_STATUS = {"parsing_failed", "embedding_failed", "fail", "failed"}
def load_log_file(log_path: Path):
"""Load logs safely, ignoring empty or invalid JSON lines."""
entries = []
if log_path.exists() and log_path.stat().st_size > 0:
with open(log_path, "r", encoding="utf-8") as file:
for line in file:
line = line.strip()
if line: # Ignore empty lines
try:
entries.append(json.loads(line))
except json.JSONDecodeError as e:
print(f"Skipping invalid log entry: {line} | Error: {e}")
return entries
def save_log_entry(log_path: Path, entry: dict):
"""Ensure all Path objects are converted to strings before logging."""
entry = {key: str(value) if isinstance(value, Path) else value for key, value in entry.items()}
with open(log_path, "a", encoding="utf-8") as file:
json.dump(entry, file, ensure_ascii=False)
file.write("\n")
@dataclass
class UploadResponse:
id: Union[str, None]
bytes: Union[int, None]
file_path: Path
relative_path: Path
filename: Union[str, None]
status: str
fail_reason: Union[str, None]
class FileUploader:
def __init__(
self, folder_path: Path, base_url: str, api_key: str, log_path: Path,
error_log_path: Path, clean_logs: bool, batch_size: int, upload_timeout: float,
document_location: str, workspace_id: Union[int, None] = None
):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
self.folder_path = folder_path
self.log_path = log_path
self.error_log_path = error_log_path
self.clean_logs = clean_logs
self.batch_size = batch_size
self.upload_timeout = upload_timeout
self.document_location = document_location
self.workspace_id = workspace_id
if clean_logs:
self.log_path.unlink(missing_ok=True)
self.error_log_path.unlink(missing_ok=True)
async def _file_is_processed(self, file_id: str, timeout: float) -> Tuple[bool, str]:
"""Checks if the file has been processed within the given timeout."""
async def check_status():
while True:
file = await self.client.files.retrieve(file_id=file_id)
if file.status in FAILED_STATUS:
return False, file.status
if file.status in SUCCESSFUL_STATUS:
return True, file.status
if file.status not in ONGOING_STATUS:
raise Exception(f"Unexpected file status: {file.status}")
await asyncio.sleep(2)
try:
return await asyncio.wait_for(check_status(), timeout=timeout)
except asyncio.TimeoutError:
result = await self.client.files.delete(file_id=file_id)
if not result.deleted:
raise Exception(f"Failed to delete timed out file: {file_id}")
raise Exception("File upload timed out")
async def _upload_file(self, semaphore: asyncio.Semaphore, file_path: Path):
async with semaphore:
try:
if file_path.stat().st_size == 0:
raise Exception("Empty file detected")
else:
extra_body = {"collection_type": self.document_location}
if self.document_location == "workspace" and self.workspace_id:
extra_body["workspace_id"] = self.workspace_id
with file_path.open("rb") as file:
response = await self.client.files.create(file=file, purpose="documents", extra_body=extra_body)
response_data = UploadResponse(
id=response.id, bytes=response.bytes, file_path=file_path,
relative_path=file_path.relative_to(self.folder_path), filename=response.filename,
status=response.status, fail_reason=None
)
success, status = await self._file_is_processed(response.id, timeout=self.upload_timeout)
response_data.status = status
save_log_entry(self.log_path if success else self.error_log_path, dataclasses.asdict(response_data))
except Exception as e:
response_data = UploadResponse(
id=None, bytes=file_path.stat().st_size, file_path=file_path,
relative_path=file_path.relative_to(self.folder_path),
filename=None, status="fail", fail_reason=str(e)
)
save_log_entry(self.error_log_path, dataclasses.asdict(response_data))
async def upload_files(self):
semaphore = asyncio.Semaphore(self.batch_size)
existing_logs = {entry["relative_path"] for entry in load_log_file(self.log_path)}
files_to_upload = [file for file in self.folder_path.rglob("*") if
file.suffix[1:].lower() in SUPPORTED_FILE_FORMATS]
files_to_upload = [file for file in files_to_upload if
str(file.relative_to(self.folder_path)) not in existing_logs]
print(f"Files to upload: {len(files_to_upload)}")
with tqdm(total=len(files_to_upload)) as progress:
tasks = [self._upload_file(semaphore, file) for file in files_to_upload]
for task in asyncio.as_completed(tasks):
await task
progress.update(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Usable script to upload multiple files to Paradigm"
)
parser.add_argument(
"folder_path", type=Path,
help="Path to the folder containing the documents to upload."
)
parser.add_argument(
"--log_path", default=Path.cwd() / Path("log_files.jsonl"), type=Path,
help="Path to the JSON file to log file upload."
)
parser.add_argument(
"--error_path", default=Path.cwd() / Path("error_files.jsonl"), type=Path,
help="Path to the JSON file to log failed uploads."
)
parser.add_argument(
"--clean_logs", action="store_true",
help="Clear log files before starting."
)
parser.add_argument(
"--api_key", default=None,
help="Paradigm API key to use."
)
parser.add_argument(
"--base_url", default="https://paradigm.lighton.ai/api/v2",
help="Base url to use."
)
parser.add_argument(
"--batch_size", default=3, type=int,
help="Number of parallel processes to run"
)
parser.add_argument(
"--upload_timeout", default=120, type=int, help="Timeout in seconds for a file to be uploaded"
)
parser.add_argument(
"--document_location", type=str, choices=["private", "company", "workspace"], default="company",
help="Paradigm space to store the documents into. Defaults to the company space."
)
parser.add_argument(
"--workspace_id", type=int,
help="The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'."
)
args = parser.parse_args()
if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key
assert api_key is not None, "No API key provided."
asyncio.run(
FileUploader(
folder_path=args.folder_path,
base_url=args.base_url,
api_key=api_key,
log_path=args.log_path,
error_log_path=args.error_path,
clean_logs=args.clean_logs,
batch_size=args.batch_size,
upload_timeout=args.upload_timeout,
document_location=args.document_location,
workspace_id=args.workspace_id
).upload_files()
)