Add a large number of documents

Here is some boilerplate code script to upload large numbers of documents.

Default throttle is set at 6 files uploaded per minute.

You can launch the script with something like:

python3 script.py /path/to/files/folder --api_key <my_api_key>
 

The script below displays a progress bar and sends files in parallel batches of a given size (10 by default, if you encounter throttling issues, try to reduce batch size).

from openai import AsyncOpenAI
from pathlib import Path
import os
import asyncio
from tqdm.asyncio import tqdm
import json

SUPPORTED_FILE_FORMATS = ["PDF", "pdf", "docx", "doc", "DOC", "md", "MD"]


def load_uploaded_files(log_path: Path):
if log_path.exists():
with open(log_path, "r") as f:
return set(json.load(f))
return set()


def save_uploaded_file(relative_file_path: Path, log_path: Path):
if log_path.exists():
with open(log_path, "r") as f:
uploaded_files = set(json.load(f))
else:
uploaded_files = set()

uploaded_files.add(str(relative_file_path))

with open(log_path, "w") as f:
json.dump(list(uploaded_files), f)


async def upload_file(
api_key: str, base_url: str, file_path: Path,
folder_path: Path, semaphore: asyncio.Semaphore
):
async with semaphore:
client = AsyncOpenAI(api_key=api_key, base_url=base_url)
response = await client.files.create(file=open(file_path, "rb"), purpose="documents")
return response, file_path.relative_to(folder_path)


async def main(folder_path: Path, base_url: str, api_key: str, batch_size: int, log_path: Path):
uploaded_files = load_uploaded_files(log_path)

files_list = [
file for format in SUPPORTED_FILE_FORMATS
for file in folder_path.rglob(f'*.{format}')
if str(file.relative_to(folder_path)) not in uploaded_files
]

print(f"Detected files to upload: {len(files_list)}")

semaphore = asyncio.Semaphore(batch_size)

with tqdm(total=len(files_list)) as pb:
tasks = [upload_file(api_key, base_url, file, folder_path, semaphore) for file in files_list]
responses = []
for task in asyncio.as_completed(tasks):
resp, relative_path = await task
responses.append(resp)
if resp.status == "success":
save_uploaded_file(relative_path, log_path)
elif resp.status == "failed":
print(f"Failed upload: {resp}")
pb.update(1)

if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(
description="Usable script to upload multiple files to Paradigm"
)
parser.add_argument(
"folder_path", type=Path,
help="Path to the folder containing the documents to upload."
)
parser.add_argument(
"--log_path", default=Path.cwd() / Path("uploaded_files.json"), type=Path,
help="Path to the JSON file to log uploaded files."
)
parser.add_argument(
"--api_key", default=None,
help="Paradigm API key to use."
)
parser.add_argument(
"--base_url", default="https://paradigm.lighton.ai/api/v2",
help="Base url to use."
)
parser.add_argument(
"--batch_size", default=10, type=int,
help="Number of parallel processes to run"
)
args = parser.parse_args()

if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key

asyncio.run(
main(
folder_path=args.folder_path,
base_url=args.base_url,
api_key=api_key,
batch_size=args.batch_size,
log_path=args.log_path
)
)