Example script to upload large numbers of documents.
Default throttle is set at 6 files uploaded per minute.
You can launch the script with something like:
python3 script.py /path/to/files/folder --api_key <my_api_key>
The script below displays a progress bar and sends files in parallel batches of a given size (3 by default, if you encounter throttling issues, try to reduce batch size).
You can choose where to upload your documents: in your private space, your company space or a workspace shared with other selected company members.
By default, the documents will be uploaded to the company space.
If you want to upload documents to a workspace, you must also specify the id of your workspace (can be retrieved from the admin panel)
Options
uploadPARADIGM.py [-h] [--log_path LOG_PATH] [--error_path ERROR_PATH]
[--clean_log_files] [--api_key API_KEY]
[--base_url BASE_URL] [--batch_size BATCH_SIZE]
[--document_location {private,company,workspace}]
[--workspace_id WORKSPACE_ID]
folder_path
positional arguments:
folder_path Path to the folder containing the documents to upload.
options:
-h, --help show this help message and exit
--log_path LOG_PATH Path to the JSON file to log file upload.
--error_path ERROR_PATH Path to the JSON file to log failed uploads.
--clean_log_files Clear the log files before starting.
--api_key API_KEY Paradigm API key to use.
--base_url BASE_URL Base url to use.
--batch_size BATCH_SIZE Number of parallel processes to run
--document_location {private,company,workspace} Paradigm space to store the documents into. Defaults to the company space.
--workspace_id WORKSPACE_ID The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'.
Full script
import dataclasses
from dataclasses import dataclass
from typing import Union, List, Literal
from pathlib import Path
import os
import asyncio
from tqdm.asyncio import tqdm
import json
from openai import AsyncOpenAI
from openai.types import FileObject
SUPPORTED_FILE_FORMATS = ["PDF", "pdf", "docx", "DOCX", "doc", "DOC", "md", "MD"]
ONGOING_STATUS = ["pending", "parsing", "embedding", "updating"]
SUCCESSFUL_STATUS = ["embedded", "success"]
FAILED_STATUS = ["parsing_failed", "embedding_failed", "fail", "failed"]
@dataclass
class UploadResponse:
id: Union[str, None]
bytes: Union[int, None]
file_path: Path
relative_path: Path
filename: Union[str, None]
status: str
fail_reason: Union[str, None]
@dataclass
class LogFileElement:
id: str
filename: str
relative_path: str
status: str
fail_reason: Union[str, None]
class Utility:
@staticmethod
async def get_files(client: AsyncOpenAI) -> List[FileObject]:
files = []
page_number = 1
while True:
list_response = await client.files.list(extra_query={"page": page_number})
files += list_response.data
if not list_response.has_next_page():
break
page_number += 1
return files
@staticmethod
async def get_file(client: AsyncOpenAI, file_id: str) -> Union[FileObject, None]:
page_number = 1
while True:
list_response = await client.files.list(extra_query={"page": page_number})
for file in list_response.data:
if file.id == file_id:
return file
if not list_response.has_next_page():
break
page_number += 1
return None
class FileUploader:
def __init__(
self, folder_path: Path, base_url: str, api_key: str,
log_path: Path, error_log_path: Path, clean_logs: bool,
batch_size: int, document_location: str, workspace_id: Union[int, None] = None
):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
self.folder_path = folder_path
self.log_path = log_path
self.error_log_path = error_log_path
self.clean_logs = clean_logs
self.batch_size = batch_size
self.document_location = document_location
self.workspace_id = workspace_id
def _load_uploaded_files(self) -> List[LogFileElement]:
data = []
if self.log_path.exists():
with open(self.log_path, "r") as f:
data = [LogFileElement(**json.loads(line)) for line in f]
return data
def _load_failed_files(self) -> List[LogFileElement]:
data = []
if self.error_log_path.exists():
with open(self.error_log_path, "r") as f:
data = [LogFileElement(**json.loads(line)) for line in f]
return data
def _log_file(self, file_info: UploadResponse, mode: Literal["info", "error"]):
data_to_log = LogFileElement(
id=file_info.id,
filename=file_info.filename,
relative_path=str(file_info.relative_path),
status=file_info.status,
fail_reason=file_info.fail_reason
)
file_to_use = self.log_path if mode == "info" else self.error_log_path
with open(file_to_use, "a") as f:
json.dump(dataclasses.asdict(data_to_log), f, ensure_ascii=False)
f.write("\n")
async def _file_is_processed(self, file_id: str, retries: int, delay: float) -> (bool, str):
for i in range(retries):
file = await Utility.get_file(self.client, file_id)
if file is not None:
if file.status in FAILED_STATUS:
return False, file.status
if i == retries - 1:
return file.status in SUCCESSFUL_STATUS, file.status
await asyncio.sleep(delay)
async def _upload_file(self, semaphore: asyncio.Semaphore, file_path: Path):
async with semaphore:
log_info = UploadResponse(
id=None,
bytes=None,
file_path=file_path,
relative_path=file_path.relative_to(self.folder_path),
filename=None,
status="failed",
fail_reason=None
)
try:
if os.path.getsize(file_path) > 0:
extra_body = {"collection_type": self.document_location}
if self.document_location == "workspace":
assert self.workspace_id is not None
extra_body["workspace_id"] = self.workspace_id
with open(file_path, "rb") as f:
response = await self.client.files.create(
file=f, purpose="documents", extra_body=extra_body
)
log_info.id = response.id
log_info.bytes = response.bytes
log_info.filename = response.filename
log_info.status = response.status
# retries, delay = self._compile_file_size(log_info.bytes)
_, status = await self._file_is_processed(response.id, retries=10, delay=2)
log_info.status = status
else:
log_info.bytes = 0
log_info.fail_reason = "Empty file detected"
except Exception as e:
log_info.fail_reason = str(e)
if log_info.status in SUCCESSFUL_STATUS:
self._log_file(log_info, "info")
else:
self._log_file(log_info, "error")
async def upload_files(self):
if self.clean_logs:
self.log_path.unlink(missing_ok=True)
self.error_log_path.unlink(missing_ok=True)
semaphore = asyncio.Semaphore(self.batch_size)
uploaded_files = self._load_uploaded_files()
uploaded_file_paths = [file.relative_path for file in uploaded_files]
failed_files = self._load_failed_files()
# If no ID has been attributed to a file, then an unexpected error has happened
failed_files_paths = [file.relative_path for file in failed_files if file.id is not None]
unexpected_error_file_paths = [file.relative_path for file in failed_files if file.id is None]
print(f"Detected uploaded files: {len(uploaded_file_paths)}")
print(f"Detected failed files: {len(failed_files_paths)}")
print(f"Detected unexpected error files: {len(unexpected_error_file_paths)}")
files_to_upload = []
for file_format in SUPPORTED_FILE_FORMATS:
for file in self.folder_path.rglob(f'*.{file_format}'):
relative_path = str(file.relative_to(self.folder_path))
if relative_path in uploaded_file_paths:
continue
elif relative_path in failed_files_paths:
continue
elif relative_path in unexpected_error_file_paths:
files_to_upload.append(file)
else:
files_to_upload.append(file)
print(f"Detected files to upload: {len(files_to_upload)}")
with tqdm(total=len(files_to_upload)) as pb:
tasks = [
self._upload_file(semaphore, file) for file in files_to_upload
]
for task in asyncio.as_completed(tasks):
await task
pb.update(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Usable script to upload multiple files to Paradigm"
)
parser.add_argument(
"folder_path", type=Path,
help="Path to the folder containing the documents to upload."
)
parser.add_argument(
"--log_path", default=Path.cwd() / Path("log_files.jsonl"), type=Path,
help="Path to the JSON file to log file upload."
)
parser.add_argument(
"--error_path", default=Path.cwd() / Path("error_files.jsonl"), type=Path,
help="Path to the JSON file to log failed uploads."
)
parser.add_argument(
"--clean_log_files", action="store_true",
help="Clear the log files before starting."
)
parser.add_argument(
"--api_key", default=None,
help="Paradigm API key to use."
)
parser.add_argument(
"--base_url", default="https://paradigm.lighton.ai/api/v2",
help="Base url to use."
)
parser.add_argument(
"--batch_size", default=3, type=int,
help="Number of parallel processes to run"
)
parser.add_argument(
"--document_location", type=str, choices=["private", "company", "workspace"], default="company",
help="Paradigm space to store the documents into. Defaults to the company space."
)
parser.add_argument(
"--workspace_id", type=int,
help="The ID of the workspace to store the documents into. Only applicable if the document location is set to 'workspace'."
)
args = parser.parse_args()
if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key
assert api_key is not None, "No API key provided."
# Initialize the FileUploader instance
file_uploader = FileUploader(
folder_path=args.folder_path,
base_url=args.base_url,
api_key=api_key,
log_path=args.log_path,
error_log_path=args.error_path,
clean_logs=args.clean_log_files,
batch_size=args.batch_size,
document_location=args.document_location,
workspace_id=args.workspace_id
)
# Run the upload_files method
asyncio.run(file_uploader.upload_files())