Contexte:
Je suis en train de crawler des produits sur des sites marchands dans le cadre du développement de mon agent IA qui fait du commerce sur Amazon.
Ma première implémentation se base sur les sitemaps contenant plusieurs lignes de produits.
Côté base de données, j’avais une implémentation naïve (je suis un adepte du principe “Keep It Simple and Stupid” 😁). À chaque fois que la fonction get_database()
est appelée, un “engine” SQLAlchemy est créé et renvoyé. C’est parfait quand on a quelques centaines de produits à insérer, mais pour des milliers comme dans mon cas, l’application se heurte aux problèmes de connexion maximale à la base de données.
C’est dans ce contexte que j’ai revu quelques parties de mon code.
Connexion à la base de données
Ancienne version
def get_database():
"""Lazily initialize the database connection."""
return create_engine(settings.DB_URL, echo=settings.DB_DEBUG)
Nouvelle implémentation avec un singleton
_engine = None
def get_database():
"""Lazily initialize the database connection with a configured connection pool."""
global _engine
if _engine is None:
_engine = create_engine(
settings.DB_URL,
echo=settings.DB_DEBUG,
pool_size=100, # Adjust pool size based on your requirements
max_overflow=5 # Allow up to 5 additional connections beyond the pool size
)
return _engine
Batch d’insertion des produits dans la base de données
Ancienne version
if len(new_products) > 0:
Product.bulk_insert(new_products)
logger.info(f"{len(new_products)} products inserted.")
if len(old_products) > 0:
Product.bulk_insert(old_products)
logger.info(f"{len(old_products)} products updated.")
Nouvelle implémentation avec des chunks
# Insert new products in smaller batches to avoid overwhelming the database
if len(new_products) > 0:
batch_size = 100 # Adjust batch size as needed
for i in range(0, len(new_products), batch_size):
Product.bulk_insert(new_products[i:i + batch_size])
logger.info(f"{len(new_products[i:i + batch_size])} products inserted.")
# Update old products in smaller batches
if len(old_products) > 0:
batch_size = 100 # Adjust batch size as needed
for i in range(0, len(old_products), batch_size):
Product.bulk_insert(old_products[i:i + batch_size])
logger.info(f"{len(old_products[i:i + batch_size])} products updated.")
Conclusion
En conclusion, ces optimisations permettent de mieux gérer les connexions à la base de données et d’éviter les erreurs liées à un trop grand nombre de connexions simultanées. L’utilisation d’un singleton pour le moteur de base de données garantit une réutilisation efficace des ressources, tandis que le traitement par lots réduit la charge sur la base de données lors des insertions et des mises à jour massives. Ces ajustements améliorent considérablement la robustesse et les performances de l’application.