Vous pouvez créer une session de chargement dans Paradigm, lui attribuer des documents et suivre la progression du chargement au fil du temps.
Vue d'ensemble
Plusieurs points de terminaison de l'API sont disponibles pour créer et gérer des sessions de chargement sur la plateforme Paradigm.
Le script présenté dans cet article fournit une implémentation asynchrone pour le chargement de plusieurs fichiers. Il utilise les bibliothèques
asyncio
et aiohttp
de Python pour gérer efficacement les chargement de fichiers simultanés.Avant de commencer les chargements, le script désactive automatiquement toutes les sessions existantes et crée une nouvelle session dédiée.
Vous pouvez également choisir l'endroit où vous souhaitez charger vos documents :
- Dans votre espace privé
- Dans l'espace de votre entreprise
- Dans un espace de travail partagé avec d'autres membres de l'entreprise sélectionnés.
Par défaut, les documents seront chargés dans l'espace de l'entreprise.
Prérequis
- Python 3.7+
- Installation des paquets nécessaires :
pip install aiohttp tqdm
aiohttp
: pour les requêtes HTTP asynchronestqdm
: Pour la visualisation de la barre de progression
Contrôle de la simultanéité
Le script utilise un sémaphore pour limiter le nombre de chargements simultanés, afin d'éviter une surcharge du serveur tout en maintenant un débit efficace. Ajustez le paramètre --batch_size
en fonction des conditions de votre réseau et de la capacité de votre serveur.
Arguments de la ligne de commande
python upload_session_async.py
--api-key="your_api_key »
--base-url="http://localhost:8000 »
--files-dir="/path/to/files »
--document-location="company »
--workspace-id=123
Arguments Description
Argument | Description | Valeur par défaut |
(required) |
Clé de l'API Paradigm (peut également être définie via la variable d'environnement PARADIGM_API_KEY ) |
Aucune |
--base-url |
URL de base de l'API Paradigm | http://localhost:8000 |
(required) |
Répertoire contenant les fichiers à charger | Aucune |
--document-location |
Où stocker les documents (choix : private , company , workspace ) |
company |
--workspace-id |
ID de l'espace de travail pour le stockage de l'espace de travail (obligatoire si document_location est workspace ) |
Aucune |
--batch-size |
Nombre maximal de chargements simultanés | 5 |
API sur l'état de la session de chargement
Un point de terminaison API dédié est disponible pour surveiller la progression de la session de chargement, vérifier quels documents ont été incorporés avec succès et identifier les éventuels échecs.
Point de terminaison : GET /api/v2/upload-session/{uuid:uuid}
Réponse : Le point de terminaison renvoie des informations sur la session de chargement, notamment son identifiant unique, son état de validité, les dates de création et de dernière mise à jour, ainsi qu'une liste des documents chargés avec leur état (intégrés ou non).
Script de chargement complet
Le script ci-dessous permet un suivi en temps réel et garantit la transparence du flux de traitement des documents :
# /// script
# requires-python = ">=3.7"
# dependencies = [
# "aiohttp",
# "tqdm",
# ]
# ///
import argparse
import asyncio
import json
import logging
import os
from pathlib import Path
import aiohttp
from tqdm.asyncio import tqdm
logger = logging.getLogger(__name__)
def configure_logging(log_path=None, clean_logs=False):
"""Configure the logging system"""
# Basic logger configuration
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Formatter for the logs
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Handler for the console
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# Handler for the file if a path is specified
if log_path:
log_file = Path(log_path)
# Clean existing logs if requested
if clean_logs and log_file.exists():
log_file.unlink()
# Create parent directory if necessary
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
logger.info(f"Logs will be saved to {log_file.absolute()}")
class AsyncUploadSession:
def __init__(
self, base_url: str, api_key: str, document_location: str, workspace_id: int | None = None, batch_size: int = 5
):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"}
self.document_location = document_location
self.workspace_id = workspace_id
self.batch_size = batch_size
async def create_session(self, session: aiohttp.ClientSession) -> dict:
"""Create a new upload session"""
# First, deactivate any existing sessions
async with session.post(f"{self.base_url}/api/v2/upload-session/deactivate", headers=self.headers) as response:
await response.json()
logger.info("Successfully deactivated existing sessions")
# Then create a new session
async with session.post(f"{self.base_url}/api/v2/upload-session", headers=self.headers) as response:
response.raise_for_status()
session_data = await response.json()
logger.info("Successfully created a new session")
return session_data
async def upload_file(
self,
session: aiohttp.ClientSession,
session_id: str,
file_path: Path,
semaphore: asyncio.Semaphore,
) -> tuple[Path, dict]:
"""Upload a single file to the session with additional data"""
# Use the semaphore to limit concurrent calls
async with semaphore:
data = aiohttp.FormData()
data.add_field("description", "File upload")
data.add_field("session_id", session_id)
data.add_field("collection_type", self.document_location)
if self.document_location == "workspace" and self.workspace_id:
data.add_field("workspace_id", str(self.workspace_id))
with open(file_path, "rb") as file_content:
data.add_field("file", file_content, filename=file_path.name, content_type="application/octet-stream")
async with session.post(
f"{self.base_url}/api/v2/upload-session/{session_id}", headers=self.headers, data=data
) as response:
status = response.status
response_body = await response.text()
if status >= 400:
logger.error(f"Error {status} uploading file {file_path.name}: {response_body}")
response.raise_for_status()
result = json.loads(response_body)
return file_path, result
async def upload_files(self, files_dir: Path):
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(self.batch_size)
logger.info(f"Limiting concurrent uploads to {self.batch_size}")
session_data = await self.create_session(session)
session_id = session_data["uuid"]
# Retrieve the list of files (including in subfolders)
files = [f for f in files_dir.rglob("*") if f.is_file()]
tasks = [self.upload_file(session, session_id, file_path, semaphore) for file_path in files]
async def safe_task(task):
try:
return await task
except Exception as e:
return e
results = await tqdm.gather(*[safe_task(t) for t in tasks])
successful_uploads = []
failed_uploads = []
for file_path, result in zip(files, results):
if isinstance(result, Exception):
logger.error(f"Failed to upload file: {file_path.name}: {result}")
failed_uploads.append(file_path.name)
else:
successful_uploads.append(file_path.name)
# Display the summary
logger.info("=" * 50)
logger.info(f"Upload session summary for {len(files)} files:")
logger.info(f"✅ Successfully uploaded: {len(successful_uploads)} files")
if failed_uploads:
logger.info(f"❌ Failed to upload: {len(failed_uploads)} files")
logger.info("Failed files:")
for failed_file in failed_uploads:
logger.info(f" - {failed_file}")
logger.info("=" * 50)
return {"total": len(files), "successful": successful_uploads, "failed": failed_uploads}
async def main_async():
parser = argparse.ArgumentParser(description="Usable script to upload multiple files to Paradigm")
parser.add_argument("--api-key", default=None, help="Paradigm API key to use.")
parser.add_argument("--base-url", default="<http://localhost:8000>", help="Base url to use.")
parser.add_argument("--files-dir", type=Path, required=True, help="Directory containing files to upload")
parser.add_argument(
"--document-location",
type=str,
choices=["private", "company", "workspace"],
default="company",
help="Paradigm space to store the documents into. Defaults to the company space.",
)
parser.add_argument(
"--workspace-id",
type=int,
help="The ID of the workspace to store the documents into. "
"Only applicable if the document location is set to 'workspace'.",
)
parser.add_argument(
"--batch-size",
type=int,
default=5,
help="Maximum number of concurrent uploads to perform",
)
parser.add_argument(
"--log-path",
type=str,
default="upload_session.log",
help="Path where to save logs. If not provided, logs will only be displayed in console.",
)
parser.add_argument(
"--clean-logs",
action="store_true",
help="If set, existing log file will be deleted before starting.",
)
args = parser.parse_args()
# Configure the logging system
configure_logging(args.log_path, args.clean_logs)
if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key
assert api_key is not None, "No API key provided."
# Check if the directory exists
if not args.files_dir.exists():
raise FileNotFoundError(f"Directory not found: {args.files_dir}")
logger.info(f"Using directory: {args.files_dir.absolute()}")
files = list(args.files_dir.glob("*"))
logger.info(f"Found files: {[f.name for f in files]}")
uploader = AsyncUploadSession(args.base_url, api_key, args.document_location, args.workspace_id, args.batch_size)
await uploader.upload_files(args.files_dir)
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()