Voici un script standard pour télécharger un grand nombre de documents dans Paradigm.
Le nombre de documents téléchargés par minute est limité à 6.
Un exemple de commande pour lancer le script :
python3 script.py /path/to/files/folder --api_key <my_api_key>
Le script ci-dessous affiche une barre de progression et envoie les fichiers en lots parallèles d'une taille donnée (3 par défaut, si vous rencontrez des problèmes de restrictions, essayez de réduire la taille des lots).
Vous pouvez choisir où télécharger vos documents : dans votre espace privé, dans l'espace de votre entreprise ou dans un espace de travail partagé avec un nombre réduit de membres de votre entreprise (workspace).
Par défaut, les documents seront téléchargés dans l'espace de l'entreprise.
Si vous souhaitez télécharger des documents dans un espace de travail (workspace), vous devez également spécifier l'identifiant de votre espace de travail (qui peut être récupéré à partir du panneau d'administration).
Options
uploadPARADIGM.py [-h] [--log_path LOG_PATH] [--error_path ERROR_PATH]
[--clean_log_files] [--api_key API_KEY]
[--base_url BASE_URL] [--batch_size BATCH_SIZE]
[--document_location {private,company,workspace}]
[--workspace_id WORKSPACE_ID]
folder_path
positional arguments:
folder_path Path to the folder containing the documents to upload.
options:
-h, --help show this help message and exit
--log_path LOG_PATH Path to the JSON file to log file upload.
--error_path ERROR_PATH Path to the JSON file to log failed uploads.
--clean_log_files Clear the log files before starting.
--api_key API_KEY Paradigm API key to use.
--base_url BASE_URL Base url to use.
--batch_size BATCH_SIZE Number of parallel processes to run
--document_location {private,company,workspace} Paradigm space to store the documents into. Defaults to the company space.
--workspace_id WORKSPACE_ID The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'.
Script complet
import dataclasses
from dataclasses import dataclass
from typing import Union, List, Literal
from pathlib import Path
import os
import asyncio
from tqdm.asyncio import tqdm
import json
from openai import AsyncOpenAI
from openai.types import FileObject
SUPPORTED_FILE_FORMATS = ["PDF", "pdf", "docx", "DOCX", "doc", "DOC", "md", "MD"]
ONGOING_STATUS = ["pending", "parsing", "embedding", "updating"]
SUCCESSFUL_STATUS = ["embedded", "success"]
FAILED_STATUS = ["parsing_failed", "embedding_failed", "fail", "failed"]
@dataclass
class UploadResponse:
id: Union[str, None]
bytes: Union[int, None]
file_path: Path
relative_path: Path
filename: Union[str, None]
status: str
fail_reason: Union[str, None]
@dataclass
class LogFileElement:
id: str
filename: str
relative_path: str
status: str
fail_reason: Union[str, None]
class Utility:
@staticmethod
async def get_files(client: AsyncOpenAI) -> List[FileObject]:
files = []
page_number = 1
while True:
list_response = await client.files.list(extra_query={"page": page_number})
files += list_response.data
if not list_response.has_next_page():
break
page_number += 1
return files
@staticmethod
async def get_file(client: AsyncOpenAI, file_id: str) -> Union[FileObject, None]:
page_number = 1
while True:
list_response = await client.files.list(extra_query={"page": page_number})
for file in list_response.data:
if file.id == file_id:
return file
if not list_response.has_next_page():
break
page_number += 1
return None
class FileUploader:
def __init__(
self, folder_path: Path, base_url: str, api_key: str,
log_path: Path, error_log_path: Path, clean_logs: bool,
batch_size: int, document_location: str, workspace_id: Union[int, None] = None
):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
self.folder_path = folder_path
self.log_path = log_path
self.error_log_path = error_log_path
self.clean_logs = clean_logs
self.batch_size = batch_size
self.document_location = document_location
self.workspace_id = workspace_id
def _load_uploaded_files(self) -> List[LogFileElement]:
data = []
if self.log_path.exists():
with open(self.log_path, "r") as f:
data = [LogFileElement(**json.loads(line)) for line in f]
return data
def _load_failed_files(self) -> List[LogFileElement]:
data = []
if self.error_log_path.exists():
with open(self.error_log_path, "r") as f:
data = [LogFileElement(**json.loads(line)) for line in f]
return data
def _log_file(self, file_info: UploadResponse, mode: Literal["info", "error"]):
data_to_log = LogFileElement(
id=file_info.id,
filename=file_info.filename,
relative_path=str(file_info.relative_path),
status=file_info.status,
fail_reason=file_info.fail_reason
)
file_to_use = self.log_path if mode == "info" else self.error_log_path
with open(file_to_use, "a") as f:
json.dump(dataclasses.asdict(data_to_log), f, ensure_ascii=False)
f.write("\n")
async def _file_is_processed(self, file_id: str, retries: int, delay: float) -> (bool, str):
for i in range(retries):
file = await Utility.get_file(self.client, file_id)
if file is not None:
if file.status in FAILED_STATUS:
return False, file.status
if i == retries - 1:
return file.status in SUCCESSFUL_STATUS, file.status
await asyncio.sleep(delay)
async def _upload_file(self, semaphore: asyncio.Semaphore, file_path: Path):
async with semaphore:
log_info = UploadResponse(
id=None,
bytes=None,
file_path=file_path,
relative_path=file_path.relative_to(self.folder_path),
filename=None,
status="failed",
fail_reason=None
)
try:
if os.path.getsize(file_path) > 0:
extra_body = {"collection_type": self.document_location}
if self.document_location == "workspace":
assert self.workspace_id is not None
extra_body["workspace_id"] = self.workspace_id
with open(file_path, "rb") as f:
response = await self.client.files.create(
file=f, purpose="documents", extra_body=extra_body
)
log_info.id = response.id
log_info.bytes = response.bytes
log_info.filename = response.filename
log_info.status = response.status
_, status = await self._file_is_processed(response.id, retries=10, delay=2)
log_info.status = status
else:
log_info.bytes = 0
log_info.fail_reason = "Empty file detected"
except Exception as e:
log_info.fail_reason = str(e)
if log_info.status in SUCCESSFUL_STATUS:
self._log_file(log_info, "info")
else:
self._log_file(log_info, "error")
async def upload_files(self):
if self.clean_logs:
self.log_path.unlink(missing_ok=True)
self.error_log_path.unlink(missing_ok=True)
semaphore = asyncio.Semaphore(self.batch_size)
uploaded_files = self._load_uploaded_files()
uploaded_file_paths = [file.relative_path for file in uploaded_files]
failed_files = self._load_failed_files()
# If no ID has been attributed to a file, then an unexpected error has happened
failed_files_paths = [file.relative_path for file in failed_files if file.id is not None]
unexpected_error_file_paths = [file.relative_path for file in failed_files if file.id is None]
print(f"Detected uploaded files: {len(uploaded_file_paths)}")
print(f"Detected failed files: {len(failed_files_paths)}")
print(f"Detected unexpected error files: {len(unexpected_error_file_paths)}")
files_to_upload = []
for file_format in SUPPORTED_FILE_FORMATS:
for file in self.folder_path.rglob(f'*.{file_format}'):
relative_path = str(file.relative_to(self.folder_path))
if relative_path in uploaded_file_paths:
continue
elif relative_path in failed_files_paths:
continue
elif relative_path in unexpected_error_file_paths:
files_to_upload.append(file)
else:
files_to_upload.append(file)
print(f"Detected files to upload: {len(files_to_upload)}")
with tqdm(total=len(files_to_upload)) as pb:
tasks = [
self._upload_file(semaphore, file) for file in files_to_upload
]
for task in asyncio.as_completed(tasks):
await task
pb.update(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Usable script to upload multiple files to Paradigm"
)
parser.add_argument(
"folder_path", type=Path,
help="Path to the folder containing the documents to upload."
)
parser.add_argument(
"--log_path", default=Path.cwd() / Path("log_files.jsonl"), type=Path,
help="Path to the JSON file to log file upload."
)
parser.add_argument(
"--error_path", default=Path.cwd() / Path("error_files.jsonl"), type=Path,
help="Path to the JSON file to log failed uploads."
)
parser.add_argument(
"--clean_log_files", action="store_true",
help="Clear the log files before starting."
)
parser.add_argument(
"--api_key", default=None,
help="Paradigm API key to use."
)
parser.add_argument(
"--base_url", default="https://paradigm.lighton.ai/api/v2",
help="Base url to use."
)
parser.add_argument(
"--batch_size", default=3, type=int,
help="Number of parallel processes to run"
)
parser.add_argument(
"--document_location", type=str, choices=["private", "company", "workspace"], default="company",
help="Paradigm space to store the documents into. Defaults to the company space."
)
parser.add_argument(
"--workspace_id", type=int,
help="The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'."
)
args = parser.parse_args()
if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key
assert api_key is not None, "No API key provided."
# Initialize the FileUploader instance
file_uploader = FileUploader(
folder_path=args.folder_path,
base_url=args.base_url,
api_key=api_key,
log_path=args.log_path,
error_log_path=args.error_path,
clean_logs=args.clean_log_files,
batch_size=args.batch_size,
document_location=args.document_location,
workspace_id=args.workspace_id
)
# Run the upload_files method
asyncio.run(file_uploader.upload_files())