Passer au contenu principal
En utilisant asyncio et la bibliothèque aiohttp, vous pouvez envoyer plusieurs requêtes simultanément, réduisant considérablement le temps d’attente et améliorant le temps de réponse pour les opérations en lot.
Efficacité et ÉvolutivitéLes requêtes asynchrones permettent des opérations non-bloquantes, rendant votre application plus efficace et évolutive, surtout lors de multiples appels API simultanés.

Prérequis

  • Assurez-vous d’avoir la bibliothèque aiohttp installée : pip install aiohttp
  • Une clé API stockée de manière sécurisée, de préférence dans une variable d’environnement PARADIGM_API_KEY.

Programmation Asynchrone avec asyncio

asyncio est une bibliothèque Python qui fournit un framework pour écrire du code concurrent en utilisant la syntaxe async et await. Elle est particulièrement utile pour le code réseau structuré de haut niveau et les opérations liées aux E/S. Vous pouvez trouver plus d’informations sur Asyncio ici.

Configuration du Client HTTP Asynchrone

Pour utiliser les fonctionnalités asynchrones, utilisez aiohttp.ClientSession. Initialisez la session avec votre clé API et l’endpoint approprié.
import aiohttp
import asyncio
import os

# Récupérer les informations d'authentification
api_key = os.getenv("PARADIGM_API_KEY")
base_url = os.getenv("PARADIGM_BASE_URL", "https://paradigm.lighton.ai/api/v2")

names = ["Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Hannah", "Ian", "Jessica", "Kevin", "Linda", "Michael", "Nancy", "Olivia", "Peter", "Quincy", "Rachel", "Samuel", "Tiffany"]

# comme exemple nous utiliserons ce lot de messages
messages_batch = [[{"role": "user", "content": f"Dis bonjour au nouvel utilisateur sur Paradigm ! Donne un accueil court d'une phrase hautement personnalisé à : {name}"}] for name in names]

Appels API Asynchrones

Voici comment implémenter des appels API asynchrones :
Créez une fonction async qui envoie un message à l’API et attend la réponse.
async def send_message(session, messages, model="alfred-4.2", temperature=0.7, max_tokens=150):
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens
    }

    async with session.post(f"{base_url}/chat/completions", headers=headers, json=payload) as response:
        data = await response.json()
        return data
Utilisez asyncio.gather pour envoyer plusieurs requêtes simultanément. Cette fonction attend que tous les futures (opérations asynchrones) se terminent.
async def main():
    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [send_message(session, messages, model="alfred-4.2", temperature=0.4) for messages in messages_batch]
        responses = await asyncio.gather(*tasks)

    duration = time.time() - start_time
    print(f"L'exécution asynchrone a pris {duration} secondes.")

    for response in responses:
        print(response["choices"][0]["message"]["content"])

    return responses
Utilisez asyncio.run() pour exécuter la fonction main, qui gère toutes les opérations asynchrones.
if __name__ == "__main__":
    asyncio.run(main())

Exemple complet

Voici le code d’exemple dans son ensemble:
import aiohttp
import asyncio
import time
import os

# Récupérer les informations d'authentification
api_key = os.getenv("PARADIGM_API_KEY")
base_url = os.getenv("PARADIGM_BASE_URL", "https://paradigm.lighton.ai/api/v2")

names = ["Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Hannah", "Ian", "Jessica", "Kevin", "Linda", "Michael", "Nancy", "Olivia", "Peter", "Quincy", "Rachel", "Samuel", "Tiffany"]

# comme exemple nous utiliserons ce lot de messages
messages_batch = [[{"role": "user", "content": f"Dis bonjour au nouvel utilisateur sur Paradigm ! Donne un accueil court d'une phrase hautement personnalisé à : {name}"}] for name in names]

async def send_message(session, messages, model="alfred-4.2", temperature=0.7, max_tokens=150):
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens
    }

    async with session.post(f"{base_url}/chat/completions", headers=headers, json=payload) as response:
        data = await response.json()
        return data

async def main():
    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [send_message(session, messages, model="alfred-4.2", temperature=0.4) for messages in messages_batch]
        responses = await asyncio.gather(*tasks)

    duration = time.time() - start_time
    print(f"L'exécution asynchrone a pris {duration:.2f} secondes.")

    for response in responses:
        print(response["choices"][0]["message"]["content"])

    return responses

if __name__ == "__main__":
    asyncio.run(main())

Comparaison avec l’Exécution Synchrone

Lors de la comparaison de l’exécution asynchrone avec l’exécution synchrone traditionnelle (séquentielle), les opérations asynchrones se terminent généralement en beaucoup moins de temps, avec un potentiel d’amélioration encore plus grand selon la longueur des différentes requêtes. Cela est particulièrement vrai pour les tâches liées aux E/S comme les requêtes API. Les gains d’efficacité de l’exécution asynchrone proviennent de sa nature non-bloquante, qui permet à d’autres tâches de continuer sans attendre que les opérations d’E/S se terminent.
Bonnes Pratiques
  • Utilisez toujours await avec les fonctions async pour éviter les erreurs d’exécution.
  • Réutilisez la même ClientSession pour plusieurs requêtes pour améliorer les performances.
  • Fermez toujours la session correctement en utilisant le gestionnaire de contexte async with.
  • Pour les notebooks Jupyter, exécutez le code asynchrone via un script Python séparé en utilisant la commande magique !python file_to_execute.py dans une cellule pour éviter les problèmes de boucle d’événements.
En incorporant des requêtes asynchrones dans votre application, vous pouvez atteindre une plus grande efficacité et évolutivité, particulièrement lors de la gestion d’un grand nombre d’appels API.

Exemple Complet de la Comparaison

Pour comparer les appels API synchrones et asynchrones dans un scénario pratique, vous pouvez utiliser l’extrait suivant. Cet extrait créera un fichier Python, speed_test.py, implémentant des requêtes API synchrones et asynchrones, respectivement. Vous pouvez ensuite exécuter ce script pour observer la différence de temps d’exécution, démontrant l’efficacité de la programmation asynchrone pour les requêtes en lot.
speed_test.py
import os
import time
import requests
import aiohttp
import asyncio

# Configuration
api_key = os.getenv("PARADIGM_API_KEY")
base_url = os.getenv("PARADIGM_BASE_URL", "https://paradigm.lighton.ai/api/v2")

names = ["Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Hannah", "Ian", "Jessica", "Kevin", "Linda", "Michael", "Nancy", "Olivia", "Peter", "Quincy", "Rachel", "Samuel", "Tiffany"]

# Fonction synchrone pour envoyer des messages
def sync_send_message(name):
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    messages = [{"role": "user", "content": f"Dis bonjour au nouvel utilisateur sur Paradigm ! Donne un accueil court d'une phrase hautement personnalisé à : {name}"}]

    payload = {
        "model": "alfred-4.2",
        "messages": messages,
        "temperature": 0.4,
        "max_tokens": 150
    }

    response = requests.post(f"{base_url}/chat/completions", headers=headers, json=payload)
    return response.json()

# Fonction asynchrone pour envoyer des messages
async def async_send_message(session, messages, model="alfred-4.2", temperature=0.4, max_tokens=150):
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens
    }

    async with session.post(f"{base_url}/chat/completions", headers=headers, json=payload) as response:
        data = await response.json()
        return data

def sync_main():
    responses = []
    start_time = time.time()
    for name in names:
        response = sync_send_message(name)
        responses.append(response)
    duration = time.time() - start_time
    print(f"L'exécution synchrone a pris {duration:.2f} secondes.")

async def async_main():
    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [
            async_send_message(
                session,
                [{"role": "user", "content": f"Dis bonjour au nouvel utilisateur sur Paradigm ! Donne un accueil court d'une phrase hautement personnalisé à : {name}"}],
                model="alfred-4.2",
                temperature=0.4
            )
            for name in names
        ]
        responses = await asyncio.gather(*tasks)

    duration = time.time() - start_time
    print(f"L'exécution asynchrone a pris {duration:.2f} secondes.")

if __name__ == "__main__":
    async_start_time = time.time()
    asyncio.run(async_main())
    async_times = time.time() - async_start_time

    sync_start_time = time.time()
    sync_main()
    sync_times = time.time() - sync_start_time

    improvement_factor = sync_times / async_times

    print(f"Facteur d'amélioration : {improvement_factor:.2f}")
Pour exécuter la comparaison :
  1. Exécutez l’extrait de code ci-dessus dans une cellule de notebook Jupyter pour créer speed_test.py.
  2. Exécutez le script dans le notebook Jupyter ou un terminal en utilisant la commande !python speed_test.py.
Ce script exécutera d’abord la version asynchrone, affichant le temps d’exécution total. Il exécutera ensuite la version synchrone, faisant de même. Comparer les deux temps d’exécution illustrera les gains d’efficacité réalisables avec les appels API asynchrones. Dans notre cas, nous avons obtenu la sortie suivante :
L'exécution asynchrone a pris 1.86 secondes.
L'exécution synchrone a pris 10.33 secondes.
Facteur d'amélioration : 5.55

Gestion des Erreurs et Timeout

Il est important d’ajouter une gestion robuste des erreurs pour les appels API asynchrones :
async def send_message_with_retry(session, messages, model="alfred-4.2", temperature=0.7, max_tokens=150, max_retries=3):
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens
    }

    for attempt in range(max_retries):
        try:
            async with session.post(
                f"{base_url}/chat/completions",
                headers=headers,
                json=payload,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                response.raise_for_status()
                data = await response.json()
                return data
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Backoff exponentiel

Conclusion

Exploiter les requêtes API asynchrones via aiohttp peut considérablement améliorer les performances et l’évolutivité des applications. Comme démontré, l’exécution asynchrone peut être presque 5 fois plus rapide que les méthodes synchrones, offrant des gains d’efficacité significatifs. Cette approche est essentielle pour gérer des interactions API à haut volume, garantissant l’efficacité des applications.