""" Worker principal pour le traitement des images de chèques """ import os import sys import time import redis import logging import signal import argparse import importlib from rq import Connection, Worker, Queue from rq.job import Job from rq.worker import SimpleWorker from dotenv import load_dotenv # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("cheque_scanner_worker") # Ajouter les modules au path sys.path.append('/app/shared') sys.path.append('/app/api_app') sys.path.append('/app') # Importer notre module tasks local try: from tasks import process_cheque_image logger.info("Fonction process_cheque_image importée avec succès") except ImportError as e: logger.error(f"Erreur lors de l'importation directe de process_cheque_image: {str(e)}") # Charger les variables d'environnement load_dotenv() # Classe Worker personnalisée qui remplace l'importation de fonction class CustomWorker(SimpleWorker): def perform_job(self, job, queue): """ Remplace l'importation de app.tasks.process_cheque_image par notre fonction locale """ # Si la tâche est app.tasks.process_cheque_image, remplacer par notre fonction locale if job.func_name == 'app.tasks.process_cheque_image' or job.func_name == 'process_cheque_image' or job.func_name == 'tasks.process_cheque_image': job.func_name = 'tasks.process_cheque_image' job._func = process_cheque_image logger.info(f"Fonction remplacée pour la tâche {job.id}") return super().perform_job(job, queue) # Variables d'environnement REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") QUEUE_NAME = os.getenv("QUEUE_NAME", "cheque_processing") HIGH_PRIORITY_QUEUE_NAME = os.getenv("HIGH_PRIORITY_QUEUE_NAME", "cheque_processing_high") WORKER_NAME = os.getenv("WORKER_NAME", f"worker-{os.getpid()}") # Créer les dossiers nécessaires UPLOAD_FOLDER = os.getenv("UPLOAD_FOLDER", "/app/data/uploads") RESULT_FOLDER = os.getenv("RESULT_FOLDER", "/app/data/results") TEMP_FOLDER = os.getenv("TEMP_FOLDER", "/app/data/tmp") os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(RESULT_FOLDER, exist_ok=True) os.makedirs(TEMP_FOLDER, exist_ok=True) def signal_handler(signum, frame): """ Gestionnaire de signal pour arrêter proprement le worker """ logger.info(f"Signal {signum} reçu, arrêt du worker...") sys.exit(0) def start_worker(queues, worker_name=None): """ Démarre un worker pour traiter les tâches des files d'attente spécifiées Args: queues (list): Liste des noms de files d'attente à traiter worker_name (str, optional): Nom du worker. Si None, un nom sera généré """ if not worker_name: worker_name = WORKER_NAME logger.info(f"Démarrage du worker '{worker_name}' sur les files: {', '.join(queues)}") # Connexion à Redis redis_conn = redis.Redis.from_url(REDIS_URL) try: # Vérifier la connexion à Redis redis_conn.ping() logger.info("Connexion à Redis établie") except redis.exceptions.ConnectionError: logger.error(f"Impossible de se connecter à Redis: {REDIS_URL}") sys.exit(1) # Configurer le gestionnaire de signal signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Vérifier la connexion à Redis logger.info("Vérification de la connexion à Redis") # Démarrer le worker avec notre classe personnalisée with Connection(redis_conn): # Créer les queues worker_queues = [Queue(name, connection=redis_conn) for name in queues] # Créer le worker personnalisé worker = CustomWorker( worker_queues, name=worker_name ) logger.info(f"Worker '{worker_name}' prêt à traiter les tâches") worker.work(with_scheduler=True) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Worker pour le traitement des images de chèques") parser.add_argument( "--queues", nargs="+", default=[HIGH_PRIORITY_QUEUE_NAME, QUEUE_NAME], help="Liste des files d'attente à traiter (par ordre de priorité)" ) parser.add_argument( "--name", type=str, default=None, help="Nom du worker" ) args = parser.parse_args() # Démarrer le worker start_worker(args.queues, args.name)