Example script to upload large numbers of documents.
Default throttle is set at 6 files uploaded per minute.
You can launch the script with something like:
python3 upload_paradigm.py /path/to/files/folder --api_key <my_api_key>
The script below displays a progress bar and sends files in parallel batches of a given size (3 by default, if you encounter throttling issues, try to reduce batch size).
You can choose where to upload your documents: in your private space, your company space or a workspace shared with other selected company members.
By default, the documents will be uploaded to the company space.
If you want to upload documents to a workspace, you must also specify the id of your workspace (can be retrieved from the admin panel)
Options
usage: upload_paradigm.py [-h] [--log_path LOG_PATH] [--error_path ERROR_PATH]
[--batch_size BATCH_SIZE] [--upload_timeout UPLOAD_TIMEOUT]
[--document_location {private,company,workspace}]
[--workspace_id WORKSPACE_ID]
folder_path
positional arguments:
folder_path Path to the folder containing the documents to upload.
options:
-h, --help show this help message and exit
--log_path LOG_PATH Path to the JSON file to log file upload.
--error_path ERROR_PATH
Path to the JSON file to log failed uploads.
--clean_logs Clear log files before starting.
--api_key API_KEY Paradigm API key to use.
--base_url BASE_URL Base url to use.
--batch_size BATCH_SIZE
Number of parallel processes to run
--upload_timeout UPLOAD_TIMEOUT
Timeout in seconds for a file to be uploaded
--document_location {private,company,workspace}
Paradigm space to store the documents into. Defaults to the company space.
--workspace_id WORKSPACE_ID
The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'
Full script
import dataclasses
import json
import os
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Union, Tuple
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI
SUPPORTED_FILE_FORMATS = {"pdf", "docx", "doc", "md"}
ONGOING_STATUS = {"pending", "parsing", "embedding", "updating"}
SUCCESSFUL_STATUS = {"embedded", "success"}
FAILED_STATUS = {"parsing_failed", "embedding_failed", "fail", "failed"}
def load_log_file(log_path: Path):
"""Load logs safely, ignoring empty or invalid JSON lines."""
entries = []
if log_path.exists() and log_path.stat().st_size > 0:
with open(log_path, "r", encoding="utf-8") as file:
for line in file:
line = line.strip()
if line: # Ignore empty lines
try:
entries.append(json.loads(line))
except json.JSONDecodeError as e:
print(f"Skipping invalid log entry: {line} | Error: {e}")
return entries
def save_log_entry(log_path: Path, entry: dict):
"""Ensure all Path objects are converted to strings before logging."""
entry = {key: str(value) if isinstance(value, Path) else value for key, value in entry.items()}
with open(log_path, "a", encoding="utf-8") as file:
json.dump(entry, file, ensure_ascii=False)
file.write("\n")
@dataclass
class UploadResponse:
id: Union[str, None]
bytes: Union[int, None]
file_path: Path
relative_path: Path
filename: Union[str, None]
status: str
fail_reason: Union[str, None]
class FileUploader:
def __init__(
self, folder_path: Path, base_url: str, api_key: str, log_path: Path,
error_log_path: Path, clean_logs: bool, batch_size: int, upload_timeout: float,
document_location: str, workspace_id: Union[int, None] = None
):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
self.folder_path = folder_path
self.log_path = log_path
self.error_log_path = error_log_path
self.clean_logs = clean_logs
self.batch_size = batch_size
self.upload_timeout = upload_timeout
self.document_location = document_location
self.workspace_id = workspace_id
if clean_logs:
self.log_path.unlink(missing_ok=True)
self.error_log_path.unlink(missing_ok=True)
async def _file_is_processed(self, file_id: str, timeout: float) -> Tuple[bool, str]:
"""Checks if the file has been processed within the given timeout."""
async def check_status():
while True:
file = await self.client.files.retrieve(file_id=file_id)
if file.status in FAILED_STATUS:
return False, file.status
if file.status in SUCCESSFUL_STATUS:
return True, file.status
if file.status not in ONGOING_STATUS:
raise Exception(f"Unexpected file status: {file.status}")
await asyncio.sleep(2)
try:
return await asyncio.wait_for(check_status(), timeout=timeout)
except asyncio.TimeoutError:
result = await self.client.files.delete(file_id=file_id)
if not result.deleted:
raise Exception(f"Failed to delete timed out file: {file_id}")
raise Exception("File upload timed out")
async def _upload_file(self, semaphore: asyncio.Semaphore, file_path: Path):
async with semaphore:
try:
if file_path.stat().st_size == 0:
raise Exception("Empty file detected")
else:
extra_body = {"collection_type": self.document_location}
if self.document_location == "workspace" and self.workspace_id:
extra_body["workspace_id"] = self.workspace_id
with file_path.open("rb") as file:
response = await self.client.files.create(file=file, purpose="documents", extra_body=extra_body)
response_data = UploadResponse(
id=response.id, bytes=response.bytes, file_path=file_path,
relative_path=file_path.relative_to(self.folder_path), filename=response.filename,
status=response.status, fail_reason=None
)
success, status = await self._file_is_processed(response.id, timeout=self.upload_timeout)
response_data.status = status
save_log_entry(self.log_path if success else self.error_log_path, dataclasses.asdict(response_data))
except Exception as e:
response_data = UploadResponse(
id=None, bytes=file_path.stat().st_size, file_path=file_path,
relative_path=file_path.relative_to(self.folder_path),
filename=None, status="fail", fail_reason=str(e)
)
save_log_entry(self.error_log_path, dataclasses.asdict(response_data))
async def upload_files(self):
semaphore = asyncio.Semaphore(self.batch_size)
existing_logs = {entry["relative_path"] for entry in load_log_file(self.log_path)}
files_to_upload = [file for file in self.folder_path.rglob("*") if
file.suffix[1:].lower() in SUPPORTED_FILE_FORMATS]
files_to_upload = [file for file in files_to_upload if
str(file.relative_to(self.folder_path)) not in existing_logs]
print(f"Files to upload: {len(files_to_upload)}")
with tqdm(total=len(files_to_upload)) as progress:
tasks = [self._upload_file(semaphore, file) for file in files_to_upload]
for task in asyncio.as_completed(tasks):
await task
progress.update(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Usable script to upload multiple files to Paradigm"
)
parser.add_argument(
"folder_path", type=Path,
help="Path to the folder containing the documents to upload."
)
parser.add_argument(
"--log_path", default=Path.cwd() / Path("log_files.jsonl"), type=Path,
help="Path to the JSON file to log file upload."
)
parser.add_argument(
"--error_path", default=Path.cwd() / Path("error_files.jsonl"), type=Path,
help="Path to the JSON file to log failed uploads."
)
parser.add_argument(
"--clean_logs", action="store_true",
help="Clear log files before starting."
)
parser.add_argument(
"--api_key", default=None,
help="Paradigm API key to use."
)
parser.add_argument(
"--base_url", default="https://paradigm.lighton.ai/api/v2",
help="Base url to use."
)
parser.add_argument(
"--batch_size", default=3, type=int,
help="Number of parallel processes to run"
)
parser.add_argument(
"--upload_timeout", default=120, type=int, help="Timeout in seconds for a file to be uploaded"
)
parser.add_argument(
"--document_location", type=str, choices=["private", "company", "workspace"], default="company",
help="Paradigm space to store the documents into. Defaults to the company space."
)
parser.add_argument(
"--workspace_id", type=int,
help="The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'."
)
args = parser.parse_args()
if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key
assert api_key is not None, "No API key provided."
asyncio.run(
FileUploader(
folder_path=args.folder_path,
base_url=args.base_url,
api_key=api_key,
log_path=args.log_path,
error_log_path=args.error_path,
clean_logs=args.clean_logs,
batch_size=args.batch_size,
upload_timeout=args.upload_timeout,
document_location=args.document_location,
workspace_id=args.workspace_id
).upload_files()
)