You can create an Upload Session in Paradigm, attribute some documents to it and follow the state of the session over time.
Overview
Several API endpoints are available to create and manage Upload Sessions in the Paradigm platform.
The presented script in this article provides an asynchronous implementation for uploading multiple files. It utilizes Python's asyncio
and aiohttp
libraries to handle concurrent file uploads efficiently.
Before starting any uploads, the script automatically deactivates all existing upload sessions and creates a new dedicated session.
You can choose where to upload your documents:
- In your private space
- In your company space
- In a workspace shared with other selected company members
By default, the documents will be uploaded to the company space.
Prerequisites
- Python 3.7+
- Installation of required packages:
pip install aiohttp tqdm
aiohttp
: For async HTTP requeststqdm
: For progress bar visualization
Concurrency Control
The script uses a semaphore to limit the number of concurrent uploads, preventing server overload while maintaining efficient throughput. Adjust the --batch_size
parameter based on your network conditions and server capacity.
Command Line Arguments
python upload_session_async.py
--api-key="your_api_key"
--base-url="http://localhost:8000"
--files-dir="/path/to/files"
--document-location="company"
--workspace-id=123
Arguments Description
Argument | Description | Default value |
(required) |
Paradigm API key (can also be set via the PARADIGM_API_KEY environment variable) |
None |
--base-url |
Base URL of the Paradigm API | http://localhost:8000 |
(required) |
Directory containing files to upload | None |
--document-location |
Where to store documents (choices: private , company , workspace ) |
company |
--workspace-id |
Workspace ID for workspace storage (required if document_location is workspace ) |
None |
--batch-size |
Maximum number of concurrent uploads | 5 |
Upload Session Status API
A dedicated API endpoint is available to monitor the progress of the upload session, check which documents have been embedded successfully, and identify any failures.
Endpoint: GET /api/v2/upload-session/{uuid:uuid}
Response: The endpoint returns details about the upload session, including its unique identifier, validity status, timestamps for creation and last update, and a list of uploaded documents with their statuses (embedded or failed).
Full Upload Script
The script below provides real-time tracking and ensures transparency in the document processing workflow:
# /// script
# requires-python = ">=3.7"
# dependencies = [
# "aiohttp",
# "tqdm",
# ]
# ///
import argparse
import asyncio
import json
import logging
import os
from pathlib import Path
import aiohttp
from tqdm.asyncio import tqdm
logger = logging.getLogger(__name__)
def configure_logging(log_path=None, clean_logs=False):
"""Configure the logging system"""
# Basic logger configuration
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Formatter for the logs
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Handler for the console
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# Handler for the file if a path is specified
if log_path:
log_file = Path(log_path)
# Clean existing logs if requested
if clean_logs and log_file.exists():
log_file.unlink()
# Create parent directory if necessary
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
logger.info(f"Logs will be saved to {log_file.absolute()}")
class AsyncUploadSession:
def __init__(
self, base_url: str, api_key: str, document_location: str, workspace_id: int | None = None, batch_size: int = 5
):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"}
self.document_location = document_location
self.workspace_id = workspace_id
self.batch_size = batch_size
async def create_session(self, session: aiohttp.ClientSession) -> dict:
"""Create a new upload session"""
# First, deactivate any existing sessions
async with session.post(f"{self.base_url}/api/v2/upload-session/deactivate", headers=self.headers) as response:
await response.json()
logger.info("Successfully deactivated existing sessions")
# Then create a new session
async with session.post(f"{self.base_url}/api/v2/upload-session", headers=self.headers) as response:
response.raise_for_status()
session_data = await response.json()
logger.info("Successfully created a new session")
return session_data
async def upload_file(
self,
session: aiohttp.ClientSession,
session_id: str,
file_path: Path,
semaphore: asyncio.Semaphore,
) -> tuple[Path, dict]:
"""Upload a single file to the session with additional data"""
# Use the semaphore to limit concurrent calls
async with semaphore:
data = aiohttp.FormData()
data.add_field("description", "File upload")
data.add_field("session_id", session_id)
data.add_field("collection_type", self.document_location)
if self.document_location == "workspace" and self.workspace_id:
data.add_field("workspace_id", str(self.workspace_id))
with open(file_path, "rb") as file_content:
data.add_field("file", file_content, filename=file_path.name, content_type="application/octet-stream")
async with session.post(
f"{self.base_url}/api/v2/upload-session/{session_id}", headers=self.headers, data=data
) as response:
status = response.status
response_body = await response.text()
if status >= 400:
logger.error(f"Error {status} uploading file {file_path.name}: {response_body}")
response.raise_for_status()
result = json.loads(response_body)
return file_path, result
async def upload_files(self, files_dir: Path):
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(self.batch_size)
logger.info(f"Limiting concurrent uploads to {self.batch_size}")
session_data = await self.create_session(session)
session_id = session_data["uuid"]
# Retrieve the list of files (including in subfolders)
files = [f for f in files_dir.rglob("*") if f.is_file()]
tasks = [self.upload_file(session, session_id, file_path, semaphore) for file_path in files]
async def safe_task(task):
try:
return await task
except Exception as e:
return e
results = await tqdm.gather(*[safe_task(t) for t in tasks])
successful_uploads = []
failed_uploads = []
for file_path, result in zip(files, results):
if isinstance(result, Exception):
logger.error(f"Failed to upload file: {file_path.name}: {result}")
failed_uploads.append(file_path.name)
else:
successful_uploads.append(file_path.name)
# Display the summary
logger.info("=" * 50)
logger.info(f"Upload session summary for {len(files)} files:")
logger.info(f"✅ Successfully uploaded: {len(successful_uploads)} files")
if failed_uploads:
logger.info(f"❌ Failed to upload: {len(failed_uploads)} files")
logger.info("Failed files:")
for failed_file in failed_uploads:
logger.info(f" - {failed_file}")
logger.info("=" * 50)
return {"total": len(files), "successful": successful_uploads, "failed": failed_uploads}
async def main_async():
parser = argparse.ArgumentParser(description="Usable script to upload multiple files to Paradigm")
parser.add_argument("--api-key", default=None, help="Paradigm API key to use.")
parser.add_argument("--base-url", default="<http://localhost:8000>", help="Base url to use.")
parser.add_argument("--files-dir", type=Path, required=True, help="Directory containing files to upload")
parser.add_argument(
"--document-location",
type=str,
choices=["private", "company", "workspace"],
default="company",
help="Paradigm space to store the documents into. Defaults to the company space.",
)
parser.add_argument(
"--workspace-id",
type=int,
help="The ID of the workspace to store the documents into. "
"Only applicable if the document location is set to 'workspace'.",
)
parser.add_argument(
"--batch-size",
type=int,
default=5,
help="Maximum number of concurrent uploads to perform",
)
parser.add_argument(
"--log-path",
type=str,
default="upload_session.log",
help="Path where to save logs. If not provided, logs will only be displayed in console.",
)
parser.add_argument(
"--clean-logs",
action="store_true",
help="If set, existing log file will be deleted before starting.",
)
args = parser.parse_args()
# Configure the logging system
configure_logging(args.log_path, args.clean_logs)
if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key
assert api_key is not None, "No API key provided."
# Check if the directory exists
if not args.files_dir.exists():
raise FileNotFoundError(f"Directory not found: {args.files_dir}")
logger.info(f"Using directory: {args.files_dir.absolute()}")
files = list(args.files_dir.glob("*"))
logger.info(f"Found files: {[f.name for f in files]}")
uploader = AsyncUploadSession(args.base_url, api_key, args.document_location, args.workspace_id, args.batch_size)
await uploader.upload_files(args.files_dir)
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()