diff --git a/api/app/config.py b/api/app/config.py index c4828bb..7d1f0a6 100644 --- a/api/app/config.py +++ b/api/app/config.py @@ -39,12 +39,20 @@ class Settings(BaseSettings): UPLOAD_FOLDER: str = os.getenv("UPLOAD_FOLDER", "/app/data/uploads") RESULT_FOLDER: str = os.getenv("RESULT_FOLDER", "/app/data/results") TEMP_FOLDER: str = os.getenv("TEMP_FOLDER", "/app/data/tmp") + CONVERTED_FOLDER: str = os.getenv("CONVERTED_FOLDER", "/app/data/converted") MAX_CONTENT_LENGTH: int = int(os.getenv("MAX_CONTENT_LENGTH", "16777216")) # 16MB ALLOWED_EXTENSIONS: set = {"png", "jpg", "jpeg", "gif", "tiff", "pdf"} + # Formats modernes (nécessitant conversion) + MODERN_EXTENSIONS: set = {"heic", "heif", "webp", "dng", "cr2", "arw", "nef", "raw"} + + # Configuration de conversion + CONVERSION_QUALITY: int = int(os.getenv("CONVERSION_QUALITY", "95")) # Qualité JPEG/PNG + DEFAULT_OUTPUT_FORMAT: str = os.getenv("DEFAULT_OUTPUT_FORMAT", "jpg") + # Configuration du traitement d'image - DEFAULT_OCR_LANGUAGE: str = os.getenv("DEFAULT_OCR_LANGUAGE", "eng") - ALTERNATIVE_OCR_LANGUAGE: str = os.getenv("ALTERNATIVE_OCR_LANGUAGE", "fra") + DEFAULT_OCR_LANGUAGE: str = os.getenv("DEFAULT_OCR_LANGUAGE", "fra") + ALTERNATIVE_OCR_LANGUAGE: str = os.getenv("ALTERNATIVE_OCR_LANGUAGE", "eng") TESSERACT_DATA_PATH: str = os.getenv("TESSERACT_DATA_PATH", "/usr/share/tesseract-ocr/4.00/tessdata") # Délais et timeouts @@ -66,4 +74,5 @@ settings = Settings() # Créer les dossiers nécessaires s'ils n'existent pas os.makedirs(settings.UPLOAD_FOLDER, exist_ok=True) os.makedirs(settings.RESULT_FOLDER, exist_ok=True) -os.makedirs(settings.TEMP_FOLDER, exist_ok=True) \ No newline at end of file +os.makedirs(settings.TEMP_FOLDER, exist_ok=True) +os.makedirs(settings.CONVERTED_FOLDER, exist_ok=True) \ No newline at end of file diff --git a/api/app/conversion.py b/api/app/conversion.py new file mode 100644 index 0000000..784b0df --- /dev/null +++ b/api/app/conversion.py @@ -0,0 +1,238 @@ +""" +Module de conversion pour les formats d'images modernes +Supporte les formats courants des appareils Android et iOS +""" + +import os +import subprocess +from pathlib import Path +import tempfile +import shutil +import logging +from typing import List, Optional, Tuple + +from .config import settings + +# Configuration du logging +logger = logging.getLogger("cheque_scanner_api.conversion") + +# Formats modernes pris en charge +MODERN_FORMATS = { + # Formats iPhone + "heic": "HEIC (High Efficiency Image Format) - iPhone", + "heif": "HEIF (High Efficiency Image Format) - iPhone", + + # Formats Android haute qualité + "webp": "WebP - Android", + "dng": "DNG (Digital Negative) - Android RAW", + + # Formats communs haute résolution + "cr2": "Canon RAW Format", + "arw": "Sony RAW Format", + "nef": "Nikon RAW Format", + "raw": "Format RAW générique", +} + +# Mapping des extensions aux formats de sortie recommandés +FORMAT_MAPPING = { + "heic": "jpg", + "heif": "jpg", + "webp": "png", + "dng": "jpg", + "cr2": "jpg", + "arw": "jpg", + "nef": "jpg", + "raw": "jpg", +} + +def is_modern_format(filename: str) -> bool: + """ + Vérifie si le fichier est dans un format moderne qui nécessite une conversion + + Args: + filename (str): Nom du fichier à vérifier + + Returns: + bool: True si le fichier est dans un format moderne, False sinon + """ + ext = Path(filename).suffix.lower().lstrip(".") + return ext in MODERN_FORMATS + +def get_supported_input_formats() -> List[str]: + """ + Renvoie la liste des formats d'entrée pris en charge + + Returns: + List[str]: Liste des extensions de fichiers pris en charge + """ + return list(MODERN_FORMATS.keys()) + list(settings.ALLOWED_EXTENSIONS) + +def convert_heic_to_jpg(input_path: str, output_path: str) -> bool: + """ + Convertit un fichier HEIC en JPG en utilisant pillow-heif + + Args: + input_path (str): Chemin du fichier HEIC + output_path (str): Chemin de sortie pour le fichier JPG + + Returns: + bool: True si la conversion a réussi, False sinon + """ + try: + import pillow_heif + from PIL import Image + + # Enregistrer le décodeur HEIF + pillow_heif.register_heif_opener() + + # Ouvrir et convertir l'image + img = Image.open(input_path) + img.convert("RGB").save(output_path, format="JPEG", quality=95) + return True + + except ImportError: + logger.warning("pillow-heif n'est pas installé, utilisation de la méthode alternative") + return _convert_with_imagemagick(input_path, output_path) + except Exception as e: + logger.error(f"Erreur lors de la conversion HEIC: {str(e)}") + return False + +def convert_webp_to_png(input_path: str, output_path: str) -> bool: + """ + Convertit un fichier WebP en PNG + + Args: + input_path (str): Chemin du fichier WebP + output_path (str): Chemin de sortie pour le fichier PNG + + Returns: + bool: True si la conversion a réussi, False sinon + """ + try: + from PIL import Image + + img = Image.open(input_path) + img.save(output_path, format="PNG") + return True + + except Exception as e: + logger.error(f"Erreur lors de la conversion WebP: {str(e)}") + return _convert_with_imagemagick(input_path, output_path) + +def _convert_with_imagemagick(input_path: str, output_path: str) -> bool: + """ + Méthode alternative utilisant ImageMagick pour la conversion + + Args: + input_path (str): Chemin du fichier d'entrée + output_path (str): Chemin du fichier de sortie + + Returns: + bool: True si la conversion a réussi, False sinon + """ + try: + # Vérifier si ImageMagick est installé + subprocess.run(["which", "convert"], check=True, capture_output=True) + + # Conversion avec ImageMagick + result = subprocess.run( + ["convert", input_path, output_path], + check=True, + capture_output=True + ) + return result.returncode == 0 + except subprocess.CalledProcessError as e: + logger.error(f"Erreur ImageMagick: {e.stderr.decode() if e.stderr else str(e)}") + return False + except Exception as e: + logger.error(f"Erreur lors de la conversion avec ImageMagick: {str(e)}") + return False + +def convert_raw_to_jpg(input_path: str, output_path: str) -> bool: + """ + Convertit un fichier RAW (DNG, CR2, ARW, NEF, etc.) en JPG + + Args: + input_path (str): Chemin du fichier RAW + output_path (str): Chemin de sortie pour le fichier JPG + + Returns: + bool: True si la conversion a réussi, False sinon + """ + try: + # Essayer d'abord avec rawpy + import rawpy + import imageio + + with rawpy.imread(input_path) as raw: + rgb = raw.postprocess(use_camera_wb=True, half_size=False, no_auto_bright=False) + imageio.imsave(output_path, rgb) + return True + + except ImportError: + logger.warning("rawpy n'est pas installé, utilisation de la méthode alternative") + return _convert_with_imagemagick(input_path, output_path) + except Exception as e: + logger.error(f"Erreur lors de la conversion RAW: {str(e)}") + return _convert_with_imagemagick(input_path, output_path) + +def convert_file(input_path: str, output_dir: str = None) -> Tuple[bool, Optional[str], Optional[str]]: + """ + Convertit un fichier d'un format moderne vers un format standard + + Args: + input_path (str): Chemin du fichier à convertir + output_dir (str, optional): Répertoire de sortie pour le fichier converti + Si non spécifié, utilise le même répertoire que le fichier d'entrée + + Returns: + Tuple[bool, Optional[str], Optional[str]]: + - Succès de la conversion (bool) + - Chemin du fichier converti (str ou None si échec) + - Message d'erreur (str ou None si succès) + """ + input_path = Path(input_path) + + # Vérifier si le fichier existe + if not input_path.exists(): + return False, None, f"Le fichier {input_path} n'existe pas" + + # Obtenir l'extension du fichier + ext = input_path.suffix.lower().lstrip(".") + + # Si l'extension est déjà dans les formats autorisés, pas besoin de conversion + if ext in settings.ALLOWED_EXTENSIONS: + return True, str(input_path), None + + # Vérifier si le format est pris en charge + if ext not in MODERN_FORMATS: + return False, None, f"Format non pris en charge: {ext}" + + # Déterminer le format de sortie + output_ext = FORMAT_MAPPING.get(ext, "jpg") + + # Déterminer le chemin de sortie + if output_dir: + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / f"{input_path.stem}.{output_ext}" + else: + output_path = input_path.with_suffix(f".{output_ext}") + + # Effectuer la conversion en fonction du format + success = False + + if ext in ["heic", "heif"]: + success = convert_heic_to_jpg(str(input_path), str(output_path)) + elif ext == "webp": + success = convert_webp_to_png(str(input_path), str(output_path)) + elif ext in ["dng", "cr2", "arw", "nef", "raw"]: + success = convert_raw_to_jpg(str(input_path), str(output_path)) + else: + # Fallback sur ImageMagick pour les autres formats + success = _convert_with_imagemagick(str(input_path), str(output_path)) + + if success: + return True, str(output_path), None + else: + return False, None, f"Échec de la conversion du fichier {input_path}" \ No newline at end of file diff --git a/api/app/main.py b/api/app/main.py index a280e27..ceabf00 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -6,12 +6,14 @@ import os import time import uuid import shutil +import json from typing import List, Optional -from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header, BackgroundTasks, Query, Request +from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header, BackgroundTasks, Query, Request, Path from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, FileResponse from fastapi.staticfiles import StaticFiles from datetime import datetime, timedelta +from pathlib import Path as FilePath import redis from rq import Queue from rq.job import Job @@ -20,9 +22,11 @@ import logging from .config import settings from .schemas import ( - UploadResponse, JobStatusResponse, JobResult, - ExtractionResult, HealthCheck, ErrorResponse, JobStatus + UploadResponse, JobStatusResponse, JobResult, + ExtractionResult, HealthCheck, ErrorResponse, JobStatus, + ConversionResponse, ConversionStatus ) +from .conversion import is_modern_format, convert_file, get_supported_input_formats from .dependencies import verify_api_key, get_redis_connection from .tasks import process_cheque_image @@ -52,8 +56,9 @@ app.add_middleware( allow_headers=["*"], ) -# Configuration des fichiers statiques pour accéder aux images de résultats +# Configuration des fichiers statiques pour accéder aux images de résultats et fichiers convertis app.mount("/static", StaticFiles(directory=settings.RESULT_FOLDER), name="static") +app.mount("/static/converted", StaticFiles(directory=settings.CONVERTED_FOLDER), name="converted_files") # Variable pour stocker le temps de démarrage start_time = time.time() @@ -183,6 +188,255 @@ async def upload_image( ) +@app.post( + f"{settings.API_PREFIX}/convert", + response_model=ConversionResponse, + tags=["Conversion"], + summary="Convertit un fichier d'un format moderne vers un format standard" +) +async def convert_image( + file: UploadFile = File(...), + target_format: Optional[str] = Query(None, description="Format cible (jpg, png)"), + api_key: str = Depends(verify_api_key) +): + """ + Convertit une image d'un format moderne (HEIC, WebP, RAW, etc.) vers un format standard (JPG, PNG) + + Cette API prend en charge les formats suivants: + - Formats iPhone: HEIC, HEIF + - Formats Android haute qualité: WebP, DNG + - Formats RAW: CR2, ARW, NEF, RAW + + La conversion est effectuée côté serveur et le fichier converti est renvoyé. + """ + # Vérifier si le format d'entrée nécessite une conversion + filename = file.filename + file_ext = FilePath(filename).suffix.lower().lstrip(".") + + # Si le format est déjà pris en charge, renvoyer une erreur + if file_ext in settings.ALLOWED_EXTENSIONS and not target_format: + return ConversionResponse( + original_filename=filename, + original_format=file_ext, + status=ConversionStatus.FAILED, + message="Le fichier est déjà dans un format pris en charge, aucune conversion nécessaire" + ) + + # Si le format n'est pas pris en charge du tout + if file_ext not in settings.ALLOWED_EXTENSIONS and file_ext not in settings.MODERN_EXTENSIONS: + return ConversionResponse( + original_filename=filename, + original_format=file_ext, + status=ConversionStatus.FAILED, + message=f"Format non pris en charge: {file_ext}. Formats supportés: {', '.join(get_supported_input_formats())}" + ) + + # Sauvegarder le fichier temporairement + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + temp_filename = f"{timestamp}_{FilePath(filename).stem}.{file_ext}" + temp_path = FilePath(settings.TEMP_FOLDER) / temp_filename + + try: + with open(temp_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + except Exception as e: + logger.error(f"Erreur lors de la sauvegarde du fichier temporaire: {str(e)}") + return ConversionResponse( + original_filename=filename, + original_format=file_ext, + status=ConversionStatus.FAILED, + message=f"Erreur lors de la sauvegarde du fichier: {str(e)}" + ) + + # Déterminer le format de sortie si spécifié + output_format = target_format if target_format else None + + # Effectuer la conversion + success, converted_path, error_message = convert_file( + str(temp_path), + settings.CONVERTED_FOLDER + ) + + # Nettoyer le fichier temporaire + if FilePath(temp_path).exists(): + FilePath(temp_path).unlink() + + if not success: + return ConversionResponse( + original_filename=filename, + original_format=file_ext, + status=ConversionStatus.FAILED, + message=error_message if error_message else "Échec de la conversion pour une raison inconnue" + ) + + # Préparer le chemin relatif pour l'URL + converted_filename = FilePath(converted_path).name + converted_format = FilePath(converted_path).suffix.lower().lstrip(".") + + # Construire l'URL pour accéder au fichier converti + converted_url = f"/static/converted/{converted_filename}" + + return ConversionResponse( + original_filename=filename, + original_format=file_ext, + converted_filename=converted_filename, + converted_format=converted_format, + converted_path=converted_url, + status=ConversionStatus.SUCCESS, + message=f"Conversion réussie de {file_ext} vers {converted_format}" + ) + + +@app.post( + f"{settings.API_PREFIX}/convert-and-process", + response_model=UploadResponse, + tags=["Conversion"], + status_code=202, + summary="Convertit un fichier et lance son traitement" +) +async def convert_and_process_image( + background_tasks: BackgroundTasks, + file: UploadFile = File(...), + priority: bool = Query(False, description="Traiter avec une priorité élevée"), + api_key: str = Depends(verify_api_key), + redis_conn: redis.Redis = Depends(get_redis_connection) +): + """ + Convertit une image d'un format moderne vers un format standard, puis lance son traitement d'extraction + + Cette API combine la conversion et le traitement en une seule étape pour les fichiers provenant + d'appareils modernes (iPhone, Android haut de gamme). + """ + # Vérifier le format du fichier + filename = file.filename + file_ext = FilePath(filename).suffix.lower().lstrip(".") + + # Créer un identifiant unique pour la tâche + job_id = str(uuid.uuid4()) + + # Si le format est déjà pris en charge, procéder directement au traitement + if file_ext in settings.ALLOWED_EXTENSIONS: + # Créer le chemin de fichier + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + new_filename = f"{timestamp}_{job_id}.{file_ext}" + file_path = os.path.join(settings.UPLOAD_FOLDER, new_filename) + + # Sauvegarder le fichier + try: + with open(file_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + except Exception as e: + logger.error(f"Erreur lors de la sauvegarde du fichier: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Erreur lors de la sauvegarde de l'image: {str(e)}" + ) + + # Si le format nécessite une conversion + elif file_ext in settings.MODERN_EXTENSIONS: + # Sauvegarder le fichier temporairement + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + temp_filename = f"{timestamp}_{job_id}.{file_ext}" + temp_path = FilePath(settings.TEMP_FOLDER) / temp_filename + + try: + with open(temp_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + except Exception as e: + logger.error(f"Erreur lors de la sauvegarde du fichier temporaire: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Erreur lors de la sauvegarde de l'image: {str(e)}" + ) + + # Effectuer la conversion + success, converted_path, error_message = convert_file( + str(temp_path), + settings.UPLOAD_FOLDER + ) + + # Nettoyer le fichier temporaire + if FilePath(temp_path).exists(): + FilePath(temp_path).unlink() + + if not success: + raise HTTPException( + status_code=500, + detail=f"Erreur lors de la conversion: {error_message if error_message else 'Raison inconnue'}" + ) + + # Utiliser le fichier converti + file_path = converted_path + + else: + # Format non pris en charge + raise HTTPException( + status_code=400, + detail=f"Format de fichier non pris en charge. Formats acceptés: " + + f"{', '.join(settings.ALLOWED_EXTENSIONS)} et {', '.join(settings.MODERN_EXTENSIONS)}" + ) + + # Déterminer la file d'attente en fonction de la priorité + queue_name = settings.HIGH_PRIORITY_QUEUE_NAME if priority else settings.QUEUE_NAME + queue = Queue(queue_name, connection=redis_conn) + + # Créer une tâche RQ + try: + # Utiliser directement le nom complet du module dans le worker + queue_job = queue.enqueue( + 'tasks.process_cheque_image', + job_id, + file_path, + result_ttl=settings.RESULT_TTL, + timeout=settings.JOB_TIMEOUT + ) + + # Stocker les métadonnées dans Redis + redis_conn.hset(f"job:{job_id}", mapping={ + "status": JobStatus.PENDING.value, + "created_at": datetime.now().isoformat(), + "file_path": file_path, + "filename": filename, + "priority": str(priority).lower(), + "converted": "true" if file_ext in settings.MODERN_EXTENSIONS else "false" + }) + + logger.info(f"Tâche créée: {job_id} - Fichier: {filename} (converti: {file_ext in settings.MODERN_EXTENSIONS})") + + return UploadResponse( + job_id=job_id, + status=JobStatus.PENDING, + message=f"Image {'convertie et ' if file_ext in settings.MODERN_EXTENSIONS else ''}en file d'attente pour traitement (file {'prioritaire' if priority else 'standard'})" + ) + + except Exception as e: + logger.error(f"Erreur lors de la création de la tâche: {str(e)}") + # Supprimer le fichier en cas d'erreur + if os.path.exists(file_path): + os.remove(file_path) + raise HTTPException( + status_code=500, + detail=f"Erreur lors de la création de la tâche: {str(e)}" + ) + + +@app.get( + f"{settings.API_PREFIX}/supported-formats", + response_model=List[str], + tags=["Conversion"], + summary="Liste tous les formats de fichiers pris en charge" +) +async def list_supported_formats( + api_key: str = Depends(verify_api_key) +): + """ + Renvoie la liste de tous les formats de fichiers pris en charge par l'API + + Cela inclut à la fois les formats standards et les formats modernes qui nécessitent une conversion. + """ + return get_supported_input_formats() + + @app.get( f"{settings.API_PREFIX}/status/{{job_id}}", response_model=JobStatusResponse, @@ -279,7 +533,7 @@ async def get_job_result( # Charger les résultats depuis Redis result_data = job_data.get("result") if result_data: - result_dict = eval(result_data) # Attention: eval n'est pas sécurisé en production + result_dict = json.loads(result_data) # Utiliser json.loads au lieu de eval result = ExtractionResult(**result_dict) texte_brut = job_data.get("texte_brut") diff --git a/api/app/schemas.py b/api/app/schemas.py index c94f726..2535b2e 100644 --- a/api/app/schemas.py +++ b/api/app/schemas.py @@ -8,6 +8,23 @@ from enum import Enum from datetime import datetime +class ConversionStatus(str, Enum): + """Statuts possibles pour une conversion de fichier""" + SUCCESS = "success" + FAILED = "failed" + + +class ConversionResponse(BaseModel): + """Réponse à une demande de conversion de fichier""" + original_filename: str = Field(..., description="Nom du fichier original") + original_format: str = Field(..., description="Format du fichier original") + converted_filename: Optional[str] = Field(None, description="Nom du fichier converti") + converted_format: Optional[str] = Field(None, description="Format du fichier converti") + converted_path: Optional[str] = Field(None, description="Chemin vers le fichier converti") + status: ConversionStatus = Field(..., description="Statut de la conversion") + message: str = Field(..., description="Message d'information ou d'erreur") + + class JobStatus(str, Enum): """Statuts possibles pour une tâche d'extraction""" PENDING = "pending" @@ -37,12 +54,24 @@ class JobStatusResponse(BaseModel): class ExtractionResult(BaseModel): """Résultat de l'extraction d'informations d'un chèque""" - montant: Optional[str] = Field(None, description="Montant du chèque") + # Champs standards + montant: Optional[str] = Field(None, description="Montant du chèque en chiffres") date: Optional[str] = Field(None, description="Date du chèque") beneficiaire: Optional[str] = Field(None, description="Bénéficiaire du chèque") numero_cheque: Optional[str] = Field(None, description="Numéro du chèque") qualite_extraction: Optional[str] = Field(None, description="Qualité de l'extraction (échec, faible, moyenne, bonne)") image_zones: Optional[str] = Field(None, description="Chemin vers l'image avec les zones identifiées") + + # Champs MICR (CMC-7) + code_banque: Optional[str] = Field(None, description="Code banque (MICR CMC-7)") + code_guichet: Optional[str] = Field(None, description="Code guichet (MICR CMC-7)") + numero_compte: Optional[str] = Field(None, description="Numéro de compte (MICR CMC-7)") + cle_rib: Optional[str] = Field(None, description="Clé RIB (MICR CMC-7)") + sequence_micr: Optional[str] = Field(None, description="Séquence MICR complète") + + # Champs avancés + montant_lettres: Optional[str] = Field(None, description="Montant du chèque en lettres") + coherence_montants: Optional[bool] = Field(None, description="Cohérence entre montant en chiffres et en lettres") class JobResult(BaseModel): diff --git a/docker-compose.yml b/docker-compose.yml index 3c135b4..469712b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,7 +23,7 @@ services: networks: - cheque-scanner-network - worker: + worker1: build: ./worker depends_on: - redis @@ -38,11 +38,35 @@ services: - TEMP_FOLDER=/app/data/tmp - QUEUE_NAME=cheque_processing - HIGH_PRIORITY_QUEUE_NAME=cheque_processing_high - - TESSERACT_DATA_PATH=/usr/share/tesseract-ocr/4.00/tessdata - - DEFAULT_OCR_LANGUAGE=eng - - ALTERNATIVE_OCR_LANGUAGE=fra - deploy: - replicas: ${WORKER_REPLICAS:-2} + - TESSERACT_DATA_PATH=/usr/share/tesseract-ocr/5/tessdata + - DEFAULT_OCR_LANGUAGE=fra + - ALTERNATIVE_OCR_LANGUAGE=eng + - WORKER_NAME=worker-1 + command: ["python", "worker.py", "--queues", "cheque_processing", "--name", "worker-1"] + restart: unless-stopped + networks: + - cheque-scanner-network + + worker2: + build: ./worker + depends_on: + - redis + volumes: + - shared_data:/app/data + - ./shared:/app/shared + - ./api/app:/app/api_app + environment: + - REDIS_URL=redis://redis:6379/0 + - UPLOAD_FOLDER=/app/data/uploads + - RESULT_FOLDER=/app/data/results + - TEMP_FOLDER=/app/data/tmp + - QUEUE_NAME=cheque_processing + - HIGH_PRIORITY_QUEUE_NAME=cheque_processing_high + - TESSERACT_DATA_PATH=/usr/share/tesseract-ocr/5/tessdata + - DEFAULT_OCR_LANGUAGE=fra + - ALTERNATIVE_OCR_LANGUAGE=eng + - WORKER_NAME=worker-2 + command: ["python", "worker.py", "--queues", "cheque_processing", "--name", "worker-2"] restart: unless-stopped networks: - cheque-scanner-network @@ -62,9 +86,9 @@ services: - RESULT_FOLDER=/app/data/results - TEMP_FOLDER=/app/data/tmp - QUEUE_NAME=cheque_processing_high - - TESSERACT_DATA_PATH=/usr/share/tesseract-ocr/4.00/tessdata - - DEFAULT_OCR_LANGUAGE=eng - - ALTERNATIVE_OCR_LANGUAGE=fra + - TESSERACT_DATA_PATH=/usr/share/tesseract-ocr/5/tessdata + - DEFAULT_OCR_LANGUAGE=fra + - ALTERNATIVE_OCR_LANGUAGE=eng - WORKER_NAME=priority-worker entrypoint: ["/priority_entrypoint.sh"] command: ["python", "worker.py", "--queues", "cheque_processing_high", "--name", "priority-worker"] diff --git a/shared/extraction/scanner.py b/shared/extraction/scanner.py index f0d222d..932c3db 100644 --- a/shared/extraction/scanner.py +++ b/shared/extraction/scanner.py @@ -11,8 +11,10 @@ import cv2 import numpy as np import tempfile import logging +import shutil # Utilisé mais non importé, ajouté +import math from pathlib import Path -from typing import Dict, Tuple, Optional, Any, Union +from typing import Dict, Tuple, Optional, Any, Union, List # Configuration du logging logging.basicConfig( @@ -23,9 +25,9 @@ logger = logging.getLogger("cheque_scanner") def extraire_infos_cheque( - chemin_image: str, - methode: str = "ocr", - language: str = "eng", + chemin_image: str, + methode: str = "hybride_avance", + language: str = "fra", tessdata: Optional[str] = None ) -> Tuple[Dict[str, Any], str]: """ @@ -33,8 +35,13 @@ def extraire_infos_cheque( Args: chemin_image: Chemin vers l'image du chèque - methode: "ocr" (utilise PyMuPDF+Tesseract) ou "cv" (utilise OpenCV sans OCR) - language: Code de langue pour OCR (eng par défaut, utiliser fra pour français si disponible) + methode: "ocr", "cv", "micr", "hybride" ou "hybride_avance" + - "ocr": Utilise PyMuPDF+Tesseract + - "cv": Utilise OpenCV sans OCR + - "micr": Extraction spécialisée des caractères MICR CMC-7 + - "hybride": Combine MICR et OCR standard + - "hybride_avance": Combine MICR, OCR et extraction du montant en lettres + language: Code de langue pour OCR (fra par défaut pour les chèques français) tessdata: Chemin vers le dossier tessdata (optionnel) Returns: @@ -45,7 +52,64 @@ def extraire_infos_cheque( logger.info(f"Traitement de l'image: {chemin_image}") - if methode == "ocr": + # Méthode hybride avancée (OCR + MICR + montant en lettres) + if methode == "hybride_avance": + try: + logger.info("Utilisation de la méthode hybride avancée") + + # 1. Extraire les infos MICR (code banque, numéro de compte, etc.) + infos_micr, texte_micr = extraire_par_micr_cmc7(chemin_image) + + # 2. Extraire les informations générales par OCR + infos_ocr, texte_ocr = extraire_par_ocr(chemin_image, language, tessdata) + + # 3. Extraire spécifiquement le montant en lettres + montant_lettres = extraire_montant_en_lettres(chemin_image) + + # 4. Fusionner les résultats + infos = {**infos_ocr, **infos_micr} + + # Ajouter le montant en lettres + if montant_lettres: + infos["montant_lettres"] = montant_lettres + + # 5. Validation croisée montant en chiffres vs lettres + if infos.get("montant") and montant_lettres: + coherence = verifier_coherence_montants(infos["montant"], montant_lettres) + infos["coherence_montants"] = coherence + + # Texte complet + texte_complet = ( + texte_ocr + + "\n--- MONTANT EN LETTRES ---\n" + (montant_lettres or "Non détecté") + + "\n--- MICR CMC-7 ---\n" + texte_micr + ) + + # Ajouter la méthode utilisée + infos["methode"] = "hybride_avance" + + return infos, texte_complet + + except Exception as e: + logger.error(f"Erreur avec la méthode hybride avancée: {e}") + logger.info("Fallback vers la méthode hybride standard...") + try: + return extraire_par_hybride(chemin_image, language, tessdata) + except Exception as e2: + logger.error(f"Erreur avec la méthode hybride: {e2}") + logger.info("Fallback vers OCR standard...") + return extraire_par_ocr(chemin_image, language, tessdata) + + # Méthode hybride standard (MICR + OCR) + elif methode == "hybride": + return extraire_par_hybride(chemin_image, language, tessdata) + + # Méthode MICR uniquement + elif methode == "micr": + return extraire_par_micr_cmc7(chemin_image) + + # Méthode OCR uniquement + elif methode == "ocr": try: logger.info("Utilisation de la méthode OCR") return extraire_par_ocr(chemin_image, language, tessdata) @@ -53,11 +117,55 @@ def extraire_infos_cheque( logger.error(f"Erreur avec OCR: {e}") logger.info("Tentative d'extraction par traitement d'image...") return extraire_par_cv(chemin_image) + + # Méthode CV uniquement else: logger.info("Utilisation de la méthode de traitement d'image sans OCR") return extraire_par_cv(chemin_image) +def extraire_par_hybride( + chemin_image: str, + language: str = "fra", + tessdata: Optional[str] = None +) -> Tuple[Dict[str, Any], str]: + """ + Extraction hybride combinant MICR CMC-7 et OCR standard + + Args: + chemin_image: Chemin vers l'image du chèque + language: Code de langue pour OCR + tessdata: Chemin vers le dossier tessdata (optionnel) + + Returns: + Tuple (infos, texte) où infos est un dictionnaire et texte le texte brut extrait + """ + logger.info(f"Extraction hybride pour: {chemin_image}") + + try: + # 1. Extraire les infos MICR (code banque, numéro de compte, etc.) + infos_micr, texte_micr = extraire_par_micr_cmc7(chemin_image) + + # 2. Extraire les informations générales par OCR + infos_ocr, texte_ocr = extraire_par_ocr(chemin_image, language, tessdata) + + # 3. Fusionner les résultats, en privilégiant MICR pour les numéros + infos = {**infos_ocr, **infos_micr} + + # Texte complet + texte_complet = texte_ocr + "\n--- MICR CMC-7 ---\n" + texte_micr + + # Ajouter la méthode utilisée + infos["methode"] = "hybride" + + return infos, texte_complet + + except Exception as e: + logger.error(f"Erreur avec la méthode hybride: {e}") + logger.info("Fallback vers OCR standard...") + return extraire_par_ocr(chemin_image, language, tessdata) + + def extraire_par_ocr( chemin_image: str, language: str = "eng", @@ -74,36 +182,135 @@ def extraire_par_ocr( # 3. Améliorer la qualité de l'image pour l'OCR # Convertir en format OpenCV pour le prétraitement - img = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape(pixmap.h, pixmap.w, 3) + # Vérifier le nombre de canaux pour éviter les erreurs de reshape + try: + # Obtenir des informations détaillées sur le pixmap + n_channels = pixmap.n + logger.debug(f"Pixmap info: h={pixmap.h}, w={pixmap.w}, n={n_channels}, stride={pixmap.stride}") + + # Vérifier si les dimensions correspondent au nombre d'échantillons + expected_size = pixmap.h * pixmap.w * n_channels + actual_size = len(pixmap.samples) + + if expected_size != actual_size: + logger.warning(f"Taille d'échantillon incorrecte: attendu {expected_size}, obtenu {actual_size}") + raise ValueError("Dimension mismatch") + + # Tenter le reshape standard + if n_channels in [1, 3, 4]: # Niveaux de gris, RGB ou RGBA + img = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape(pixmap.h, pixmap.w, n_channels) + if n_channels == 4: # Si RGBA, convertir en RGB + img = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB) + elif n_channels == 1: # Si grayscale, convertir en RGB pour traitement uniforme + img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) + else: + raise ValueError(f"Nombre de canaux inattendu: {n_channels}") + + except Exception as e: + logger.warning(f"Erreur lors de la conversion du pixmap: {e}") + # Méthode alternative: sauvegarder et recharger l'image + with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: + tmp_path = tmp_file.name + pixmap.save(tmp_path) + + logger.info(f"Utilisation de la méthode alternative: chargement via OpenCV depuis {tmp_path}") + img = cv2.imread(tmp_path) + if img is None: + # Si l'image ne peut pas être chargée, essayer de la charger en niveaux de gris + img = cv2.imread(tmp_path, cv2.IMREAD_GRAYSCALE) + if img is None: + raise ValueError(f"Impossible de charger l'image: {tmp_path}") + # Convertir en RGB pour le traitement standard + img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) + + # Conserver le fichier temporaire pour le débug si nécessaire + debug_tmp_path = os.path.join(os.path.dirname(chemin_image), f"debug_tmp_{os.path.basename(chemin_image)}") + shutil.copy(tmp_path, debug_tmp_path) + logger.info(f"Image temporaire sauvegardée pour debug: {debug_tmp_path}") + + # Nettoyage du fichier temporaire original + os.unlink(tmp_path) - # Prétraitement pour améliorer la détection de texte + # Prétraitement avancé pour améliorer la détection de texte + # 1. Conversion en niveaux de gris gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) - blurred = cv2.GaussianBlur(gray, (5, 5), 0) - _, thresh = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) + + # 2. Amélioration du contraste par égalisation d'histogramme + clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) + enhanced = clahe.apply(gray) + + # 3. Réduction du bruit + blurred = cv2.GaussianBlur(enhanced, (5, 5), 0) + + # 4. Binarisation adaptative pour mieux préserver les détails du texte + thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY, 11, 2) + + # 5. Opérations morphologiques pour nettoyer l'image + kernel = np.ones((1, 1), np.uint8) + thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) # Sauvegarder l'image prétraitée temporairement with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: tmp_path = tmp_file.name cv2.imwrite(tmp_path, thresh) - # Charger l'image prétraitée - pixmap_processed = fitz.Pixmap(tmp_path) - + try: + # Charger l'image prétraitée + pixmap_processed = fitz.Pixmap(tmp_path) + + # Vérifier la validité du pixmap + if pixmap_processed.width == 0 or pixmap_processed.height == 0: + raise ValueError("Pixmap invalide après prétraitement") + + # Debug: sauvegarder une copie de l'image prétraitée pour inspection + debug_path = os.path.join(os.path.dirname(chemin_image), f"debug_preprocess_{os.path.basename(chemin_image)}") + pixmap_processed.save(debug_path) + logger.info(f"Image prétraitée sauvegardée pour debug: {debug_path}") + except Exception as e: + logger.error(f"Erreur lors du chargement de l'image prétraitée: {e}") + raise e + try: # 4. Effectuer l'OCR et générer un PDF avec une couche de texte - if tessdata: - # Si un chemin vers tessdata est fourni - logger.debug(f"Utilisation du dossier tessdata personnalisé: {tessdata}") - pdf_ocr = pixmap_processed.pdfocr_tobytes(language=language, tessdata=tessdata) + try: + if tessdata: + # Si un chemin vers tessdata est fourni + logger.debug(f"Utilisation du dossier tessdata personnalisé: {tessdata}") + pdf_ocr = pixmap_processed.pdfocr_tobytes(language=language, tessdata=tessdata) + else: + # Essai avec la configuration par défaut + logger.debug(f"Utilisation du dossier tessdata par défaut") + pdf_ocr = pixmap_processed.pdfocr_tobytes(language=language) + except Exception as ocr_error: + logger.warning(f"Erreur lors de l'OCR avec PyMuPDF: {ocr_error}") + # Essai avec Tesseract directement via un fichier temporaire + import pytesseract + + logger.info("Utilisation de pytesseract directement") + # L'image prétraitée est déjà sauvegardée dans tmp_path + texte = pytesseract.image_to_string(tmp_path, lang=language) + + # Simuler un document PDF pour maintenir la cohérence du code + class MockPage: + def get_text(self): + return texte + + class MockDoc: + def __init__(self): + self.pages = [MockPage()] + def __getitem__(self, idx): + return self.pages[0] + def close(self): + pass + + doc = MockDoc() + page = doc[0] else: - # Essai avec la configuration par défaut - logger.debug(f"Utilisation du dossier tessdata par défaut") - pdf_ocr = pixmap_processed.pdfocr_tobytes(language=language) - - # 5. Charger le PDF OCR et extraire le texte - doc = fitz.open("pdf", pdf_ocr) - page = doc[0] - texte = page.get_text() + # 5. Charger le PDF OCR et extraire le texte (seulement si la méthode PyMuPDF a réussi) + doc = fitz.open("pdf", pdf_ocr) + page = doc[0] + texte = page.get_text() # 6. Analyser le texte pour trouver les informations pertinentes infos = { @@ -134,73 +341,1011 @@ def extraire_par_ocr( def extraire_par_cv(chemin_image: str) -> Tuple[Dict[str, Any], str]: """Extraction par traitement d'image avec OpenCV sans OCR""" - # Charger l'image - img = cv2.imread(chemin_image) + logger.info(f"Extraction par traitement d'image pour: {chemin_image}") - if img is None: - raise ValueError(f"Impossible de charger l'image: {chemin_image}") + try: + # Charger l'image avec différentes méthodes si nécessaire + img = cv2.imread(chemin_image) + + if img is None: + # Essayer de charger l'image en niveaux de gris + logger.warning(f"Impossible de charger l'image en couleur, tentative en niveaux de gris") + img = cv2.imread(chemin_image, cv2.IMREAD_GRAYSCALE) + if img is None: + # Dernière tentative: utiliser PIL/Pillow + logger.warning("Tentative de chargement via PIL/Pillow") + try: + from PIL import Image + import numpy as np + + pil_img = Image.open(chemin_image) + img = np.array(pil_img) + if len(img.shape) == 2: # Image en niveaux de gris + img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) + elif img.shape[2] == 4: # Image RGBA + img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) + except Exception as pil_error: + logger.error(f"Erreur avec PIL: {pil_error}") + raise ValueError(f"Impossible de charger l'image: {chemin_image}") + + if img is None: + raise ValueError(f"Impossible de charger l'image: {chemin_image} avec aucune méthode") + + # Enregistrer les dimensions de l'image pour le diagnostic + height, width = img.shape[:2] + logger.info(f"Dimensions de l'image: {width}x{height}") + + # Prétraitement avancé + # Conversion en niveaux de gris si nécessaire + if len(img.shape) == 3: # Image en couleur + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + else: # Déjà en niveaux de gris + gray = img + + # Amélioration du contraste + clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) + enhanced = clahe.apply(gray) + + # Réduction du bruit + blurred = cv2.GaussianBlur(enhanced, (5, 5), 0) + + # Binarisation adaptative + thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY_INV, 11, 2) + + # Opérations morphologiques pour améliorer la détection + kernel = np.ones((3, 3), np.uint8) + thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) + thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) + + # Enregistrer l'image prétraitée pour diagnostic + preproc_path = f"pretraitement_{os.path.basename(chemin_image)}" + cv2.imwrite(preproc_path, thresh) + logger.info(f"Image prétraitée enregistrée: {preproc_path}") + + # Trouver les contours (rectangles potentiels des zones importantes) + contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + + # Filtrer les contours pour ne garder que ceux qui pourraient être des zones d'intérêt + filtered_contours = [] + for contour in contours: + area = cv2.contourArea(contour) + # Ne garder que les contours d'une certaine taille + if area > 500 and area < (width * height * 0.5): + # Calculer le rectangle englobant + x, y, w, h = cv2.boundingRect(contour) + # Filtrer par ratio d'aspect pour éviter les lignes/bandes trop étroites + aspect_ratio = float(w) / h if h > 0 else 0 + if 0.1 < aspect_ratio < 10: + filtered_contours.append(contour) + + logger.info(f"Nombre de contours détectés: {len(contours)}, filtrés: {len(filtered_contours)}") + + # Utiliser des contours détectés si possible, sinon utiliser des zones prédéfinies + if len(filtered_contours) >= 3: + # Trier les contours par position (haut vers bas) + filtered_contours = sorted(filtered_contours, key=lambda c: cv2.boundingRect(c)[1]) + + # Utiliser des heuristiques pour déterminer les zones importantes + + # Pour la date (généralement en haut) + top_contours = [c for c in filtered_contours if cv2.boundingRect(c)[1] < height * 0.3] + if top_contours: + # Préférer les contours en haut à droite pour la date + top_right_contours = sorted(top_contours, key=lambda c: width - (cv2.boundingRect(c)[0] + cv2.boundingRect(c)[2])) + if top_right_contours: + x, y, w, h = cv2.boundingRect(top_right_contours[0]) + zone_date = (x, y, x + w, y + h) + else: + x, y, w, h = cv2.boundingRect(top_contours[0]) + zone_date = (x, y, x + w, y + h) + else: + # Zone par défaut pour la date + zone_date = (int(width*0.7), int(height*0.1), int(width*0.95), int(height*0.25)) + + # Pour le bénéficiaire (généralement au milieu) + middle_contours = [c for c in filtered_contours if height * 0.2 < cv2.boundingRect(c)[1] < height * 0.6] + if middle_contours: + # Trouver le plus grand contour dans la zone du milieu + largest_middle = max(middle_contours, key=cv2.contourArea) + x, y, w, h = cv2.boundingRect(largest_middle) + zone_beneficiaire = (x, y, x + w, y + h) + else: + # Zone par défaut pour le bénéficiaire + zone_beneficiaire = (int(width*0.2), int(height*0.25), int(width*0.8), int(height*0.4)) + + # Pour le montant (généralement en bas à droite) + bottom_contours = [c for c in filtered_contours if cv2.boundingRect(c)[1] > height * 0.5] + if bottom_contours: + # Préférer les contours en bas à droite pour le montant + bottom_right_contours = sorted(bottom_contours, key=lambda c: -cv2.boundingRect(c)[0]) + if bottom_right_contours: + x, y, w, h = cv2.boundingRect(bottom_right_contours[0]) + zone_montant = (x, y, x + w, y + h) + else: + x, y, w, h = cv2.boundingRect(bottom_contours[-1]) + zone_montant = (x, y, x + w, y + h) + else: + # Zone par défaut pour le montant + zone_montant = (int(width*0.6), int(height*0.35), int(width*0.95), int(height*0.55)) + + logger.info("Zones détectées basées sur analyse des contours") + else: + # Estimation des zones d'intérêt basée sur la position relative + logger.info("Utilisation des zones prédéfinies (pas assez de contours détectés)") + + # Zone approximative pour le montant (généralement en bas à droite) + zone_montant = (int(width*0.6), int(height*0.35), int(width*0.95), int(height*0.55)) + + # Zone approximative pour la date (généralement en haut à droite) + zone_date = (int(width*0.7), int(height*0.1), int(width*0.95), int(height*0.25)) + + # Zone approximative pour le bénéficiaire (généralement au milieu) + zone_beneficiaire = (int(width*0.2), int(height*0.25), int(width*0.8), int(height*0.4)) + + # Créer une image avec les zones identifiées et les contours + img_zones = img.copy() + + # Dessiner tous les contours filtrés en jaune + cv2.drawContours(img_zones, filtered_contours, -1, (0, 255, 255), 1) + + # Dessiner les zones principales avec étiquettes + cv2.rectangle(img_zones, (zone_montant[0], zone_montant[1]), + (zone_montant[2], zone_montant[3]), (0, 255, 0), 2) + cv2.putText(img_zones, "Montant", (zone_montant[0], zone_montant[1]-5), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) + + cv2.rectangle(img_zones, (zone_date[0], zone_date[1]), + (zone_date[2], zone_date[3]), (255, 0, 0), 2) + cv2.putText(img_zones, "Date", (zone_date[0], zone_date[1]-5), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) + + cv2.rectangle(img_zones, (zone_beneficiaire[0], zone_beneficiaire[1]), + (zone_beneficiaire[2], zone_beneficiaire[3]), (0, 0, 255), 2) + cv2.putText(img_zones, "Bénéficiaire", (zone_beneficiaire[0], zone_beneficiaire[1]-5), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) + + # Générer un nom de fichier unique pour l'image avec les zones identifiées + image_result_path = f"zones_identifiees_{os.path.basename(chemin_image)}" + cv2.imwrite(image_result_path, img_zones) + logger.info(f"Image avec zones identifiées enregistrée sous: {image_result_path}") + + # Tenter d'extraire le texte des zones avec OCR si pytesseract est disponible + try: + import pytesseract + logger.info("Tentative d'OCR des zones avec pytesseract") + + # Extraire les sous-images pour chaque zone + roi_montant = gray[zone_montant[1]:zone_montant[3], zone_montant[0]:zone_montant[2]] + roi_date = gray[zone_date[1]:zone_date[3], zone_date[0]:zone_date[2]] + roi_beneficiaire = gray[zone_beneficiaire[1]:zone_beneficiaire[3], zone_beneficiaire[0]:zone_beneficiaire[2]] + + # OCR sur chaque zone + texte_montant = pytesseract.image_to_string(roi_montant, lang='fra').strip() + texte_date = pytesseract.image_to_string(roi_date, lang='fra').strip() + texte_beneficiaire = pytesseract.image_to_string(roi_beneficiaire, lang='fra').strip() + + logger.info(f"OCR zones - Montant: '{texte_montant}', Date: '{texte_date}', Bénéficiaire: '{texte_beneficiaire}'") + + # Informations extraites avec OCR des zones + infos = { + "montant": texte_montant if texte_montant else f"Zone identifiée, voir {image_result_path} (vert)", + "date": texte_date if texte_date else f"Zone identifiée, voir {image_result_path} (bleu)", + "beneficiaire": texte_beneficiaire if texte_beneficiaire else f"Zone identifiée, voir {image_result_path} (rouge)", + "numero_cheque": "Non détecté", + "image_zones": image_result_path, + "methode": "cv+ocr_zones" + } + + texte_complet = f"Montant: {texte_montant}\nDate: {texte_date}\nBénéficiaire: {texte_beneficiaire}" + + return infos, texte_complet + + except Exception as ocr_error: + logger.warning(f"Échec de l'OCR des zones: {ocr_error}") + + # Sans OCR, retourner les zones identifiées + infos = { + "montant": f"Zone identifiée, voir {image_result_path} (vert)", + "date": f"Zone identifiée, voir {image_result_path} (bleu)", + "beneficiaire": f"Zone identifiée, voir {image_result_path} (rouge)", + "numero_cheque": "Non détecté sans OCR", + "image_zones": image_result_path, + "methode": "cv_only" + } + + return infos, f"Texte non disponible sans OCR - visualisation des zones générée dans {image_result_path}" + + except Exception as e: + logger.error(f"Erreur lors du traitement de l'image: {str(e)}") + # Créer une image basique avec un message d'erreur + error_img = np.ones((300, 600, 3), dtype=np.uint8) * 255 + cv2.putText(error_img, f"ERREUR: {str(e)}", (50, 150), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) + image_result_path = f"erreur_{os.path.basename(chemin_image)}" + cv2.imwrite(image_result_path, error_img) + + # Informations minimales en cas d'erreur + infos = { + "montant": "Erreur de traitement", + "date": "Erreur de traitement", + "beneficiaire": "Erreur de traitement", + "numero_cheque": "Erreur de traitement", + "image_zones": image_result_path, + "erreur": str(e) + } + + return infos, f"Erreur lors du traitement de l'image: {str(e)}" + + +# Fonctions d'extraction MICR et montant en lettres +def extraire_par_micr_cmc7(chemin_image: str) -> Tuple[Dict[str, Any], str]: + """ + Extraction spécialisée pour les caractères CMC-7 au bas des chèques européens - # Prétraitement - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - blurred = cv2.GaussianBlur(gray, (5, 5), 0) - thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, - cv2.THRESH_BINARY_INV, 11, 2) + Args: + chemin_image: Chemin vers l'image du chèque - # Trouver les contours (rectangles potentiels des zones importantes) - contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + Returns: + Tuple (infos, texte) où infos contient les informations bancaires structurées + """ + logger.info(f"Extraction MICR CMC-7 pour: {chemin_image}") - # Trier les contours par taille (du plus grand au plus petit) - contours = sorted(contours, key=cv2.contourArea, reverse=True) - - # Estimation des zones d'intérêt basée sur la position relative - # (ceci est approximatif et dépend du format exact du chèque) + try: + # Chargement et validation de l'image + img = cv2.imread(chemin_image) + if img is None: + raise ValueError(f"Impossible de charger l'image: {chemin_image}") + + # 1. Localisation de la bande MICR (tiers inférieur du chèque) + bande_micr = localiser_bande_micr_cmc7(img) + + # Sauvegarder la bande MICR pour diagnostic + debug_path = f"debug_micr_band_{os.path.basename(chemin_image)}" + cv2.imwrite(debug_path, bande_micr) + logger.info(f"Bande MICR sauvegardée pour debug: {debug_path}") + + # 2. Prétraitement spécifique pour CMC-7 + bande_preprocessed = pretraiter_cmc7(bande_micr) + + # Sauvegarder l'image prétraitée + debug_preproc_path = f"debug_micr_preproc_{os.path.basename(chemin_image)}" + cv2.imwrite(debug_preproc_path, bande_preprocessed) + + # 3. Segmentation des caractères individuels + caracteres = segmenter_caracteres_cmc7(bande_preprocessed) + logger.info(f"Nombre de caractères CMC-7 segmentés: {len(caracteres)}") + + # 4. Reconnaissance des caractères CMC-7 + sequence = reconnaitre_sequence_cmc7(caracteres) + logger.info(f"Séquence MICR reconnue: {sequence}") + + # 5. Extraction des informations structurées + infos = extraire_infos_cmc7(sequence) + + return infos, f"Séquence MICR: {sequence}" + + except Exception as e: + logger.error(f"Erreur lors de l'extraction MICR: {str(e)}") + return { + "erreur_micr": str(e), + "methode": "micr_cmc7_failed" + }, f"Erreur lors de l'extraction MICR: {str(e)}" + + +def localiser_bande_micr_cmc7(img): + """ + Localise la bande MICR en bas du chèque + Le CMC-7 est généralement dans le tiers inférieur + """ height, width = img.shape[:2] - # Zone approximative pour le montant (généralement en bas à droite) - zone_montant = (int(width*0.6), int(height*0.35), int(width*0.95), int(height*0.55)) + # Focalisation sur le tiers inférieur où se trouve typiquement la ligne MICR + lower_third = img[int(height*0.65):height, 0:width] - # Zone approximative pour la date (généralement en haut à droite) - zone_date = (int(width*0.7), int(height*0.1), int(width*0.95), int(height*0.25)) + # Conversion en niveaux de gris si nécessaire + gray = cv2.cvtColor(lower_third, cv2.COLOR_BGR2GRAY) if len(lower_third.shape) > 2 else lower_third - # Zone approximative pour le bénéficiaire (généralement au milieu) - zone_beneficiaire = (int(width*0.2), int(height*0.25), int(width*0.8), int(height*0.4)) + # Binarisation adaptative pour mettre en évidence les caractères + binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY_INV, 19, 3) - # Pour la démonstration, créer une image avec les zones identifiées - img_zones = img.copy() - cv2.rectangle(img_zones, (zone_montant[0], zone_montant[1]), - (zone_montant[2], zone_montant[3]), (0, 255, 0), 2) - cv2.rectangle(img_zones, (zone_date[0], zone_date[1]), - (zone_date[2], zone_date[3]), (255, 0, 0), 2) - cv2.rectangle(img_zones, (zone_beneficiaire[0], zone_beneficiaire[1]), - (zone_beneficiaire[2], zone_beneficiaire[3]), (0, 0, 255), 2) + # Opérations morphologiques pour connecter les composants des caractères + kernel = np.ones((3, 9), np.uint8) # Noyau horizontal pour connecter les barres du CMC-7 + connected = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel) - # Générer un nom de fichier unique pour l'image avec les zones identifiées - image_result_path = f"zones_identifiees_{os.path.basename(chemin_image)}" - cv2.imwrite(image_result_path, img_zones) - logger.info(f"Image avec zones identifiées enregistrée sous: {image_result_path}") + # Détection des composantes connexes (candidats potentiels) + contours, _ = cv2.findContours(connected, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - # Sans OCR, nous ne pouvons pas extraire le texte directement, - # mais nous pouvons indiquer les zones où se trouvent les informations + # Filtrage des contours par taille pour trouver la ligne MICR + filtered_contours = [] + for contour in contours: + x, y, w, h = cv2.boundingRect(contour) + aspect_ratio = w / h + area = cv2.contourArea(contour) + + # Filtres pour identifier la ligne MICR: + # - Largeur significative (une ligne de caractères) + # - Hauteur dans une plage typique pour CMC-7 + # - Ratio d'aspect élevé (c'est une ligne) + if w > width*0.3 and 10 < h < 50 and aspect_ratio > 5: + filtered_contours.append(contour) + + # Si aucun contour correspondant n'est trouvé, utiliser une approche heuristique + if not filtered_contours: + logger.warning("Bande MICR non détectée par analyse de contours, utilisation d'heuristique") + y_start = int(gray.shape[0] * 0.5) # Milieu du tiers inférieur + y_end = int(gray.shape[0] * 0.8) # 80% vers le bas du tiers inférieur + return lower_third[y_start:y_end, 0:width] + + # Trier par position verticale (de haut en bas) + filtered_contours = sorted(filtered_contours, key=lambda c: cv2.boundingRect(c)[1]) + + # Prendre le dernier contour (probablement la ligne MICR en bas) + best_contour = filtered_contours[-1] + x, y, w, h = cv2.boundingRect(best_contour) + + # Extraire la région avec une marge + margin = int(h * 0.4) + y_start = max(0, y - margin) + y_end = min(lower_third.shape[0], y + h + margin) + + micr_line = lower_third[y_start:y_end, 0:width] + + return micr_line + + +def pretraiter_cmc7(image): + """ + Prétraitement spécifique pour les caractères CMC-7 + Optimisé pour faire ressortir les barres verticales caractéristiques + """ + # Conversion en niveaux de gris si nécessaire + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) > 2 else image + + # Amélioration du contraste + clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) + enhanced = clahe.apply(gray) + + # Réduction du bruit + denoised = cv2.fastNlMeansDenoising(enhanced, None, 13, 7, 21) + + # Binarisation adaptative adaptée au CMC-7 + binary = cv2.adaptiveThreshold(denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY_INV, 19, 5) + + # Opérations morphologiques pour nettoyer l'image + kernel_v = np.ones((7, 1), np.uint8) # Noyau vertical pour renforcer les barres + morphed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel_v) + morphed = cv2.morphologyEx(morphed, cv2.MORPH_OPEN, np.ones((1, 2), np.uint8)) + + return morphed + + +def segmenter_caracteres_cmc7(image_preprocessed): + """ + Segmente les caractères individuels du CMC-7 + Les caractères CMC-7 ont un espacement régulier et une largeur constante + """ + # Trouver les contours des caractères potentiels + contours, _ = cv2.findContours(image_preprocessed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + # Filtrer et trier les contours par position horizontale (gauche à droite) + char_contours = [] + for contour in contours: + x, y, w, h = cv2.boundingRect(contour) + # Filtrer par taille typique d'un caractère CMC-7 + if 5 < w < 40 and h > 10: + char_contours.append((x, y, w, h)) + + # Trier les contours de gauche à droite + char_contours.sort(key=lambda c: c[0]) + + # Extraire chaque caractère + caracteres = [] + for x, y, w, h in char_contours: + # Ajouter une marge autour du caractère + margin_x = int(w * 0.1) + margin_y = int(h * 0.1) + + # Calculer les coordonnées avec marge + x1 = max(0, x - margin_x) + y1 = max(0, y - margin_y) + x2 = min(image_preprocessed.shape[1], x + w + margin_x) + y2 = min(image_preprocessed.shape[0], y + h + margin_y) + + # Extraire le caractère + char_img = image_preprocessed[y1:y2, x1:x2] + + # Normaliser la taille pour la reconnaissance + char_img_resized = cv2.resize(char_img, (28, 40)) + + caracteres.append(char_img_resized) + + return caracteres + + +def reconnaitre_sequence_cmc7(caracteres): + """ + Reconnaissance basique des caractères CMC-7 segmentés + Dans cette version, nous utilisons une approche simplifiée qui identifie + principalement les chiffres et retourne la séquence brute + """ + # Dans cette implémentation de base, nous simulons la reconnaissance + # Une implémentation complète nécessiterait soit: + # 1. Un ensemble de templates pour template matching + # 2. Un classificateur ML entraîné pour les caractères CMC-7 + + sequence = "" + separateurs = ["S1", "S2", "S3", "S4", "S5"] + sep_count = 0 + + for i, char_img in enumerate(caracteres): + # Calculer le nombre de pixels blancs (approximation de la densité) + pixel_density = np.sum(char_img) / 255.0 / (char_img.shape[0] * char_img.shape[1]) + + # Dans l'idéal, nous aurions un classificateur précis ici + # Pour cette implémentation de base, nous utilisons une heuristique simplifiée + # Les caractères de séparation ont généralement une densité différente + + # Si la densité est dans une certaine plage, c'est probablement un séparateur + if 0.4 < pixel_density < 0.6 and i % 7 == 6 and sep_count < 5: + char = separateurs[sep_count] + sep_count += 1 + else: + # Choisir un chiffre basé sur la densité (approximatif) + # Cela est très basique et devrait être remplacé par une vraie reconnaissance + digit_idx = min(9, int(pixel_density * 10)) + char = str(digit_idx) + + sequence += char + + return sequence + + +def extraire_infos_cmc7(sequence): + """ + Extrait les informations bancaires d'une séquence CMC-7 + Format français: [Code banque][S1][Code guichet][S2][Numéro de compte][S3][Clé RIB][S4][Numéro de chèque][S5] + """ + # Recherche des séparateurs (S1 à S5) + separateurs = ['S1', 'S2', 'S3', 'S4', 'S5'] + + # Initialiser les résultats infos = { - "montant": f"Zone identifiée, voir {image_result_path} (vert)", - "date": f"Zone identifiée, voir {image_result_path} (bleu)", - "beneficiaire": f"Zone identifiée, voir {image_result_path} (rouge)", - "numero_cheque": "Non détecté sans OCR", - "image_zones": image_result_path + "code_banque": None, + "code_guichet": None, + "numero_compte": None, + "cle_rib": None, + "numero_cheque": None, + "sequence_micr": sequence, + "format": "CMC-7" } - return infos, f"Texte non disponible sans OCR - visualisation des zones générée dans {image_result_path}" + # Si la séquence est trop courte, retourner des valeurs par défaut + if len(sequence) < 10: + logger.warning(f"Séquence MICR trop courte: {sequence}") + return infos + + # Essayer de découper la séquence selon les séparateurs + try: + positions = [] + current_pos = 0 + + # Chercher les positions des séparateurs + for sep in separateurs: + pos = sequence.find(sep, current_pos) + if pos != -1: + positions.append(pos) + current_pos = pos + len(sep) + else: + positions.append(-1) + + # Si tous les séparateurs sont trouvés + if all(pos != -1 for pos in positions): + infos["code_banque"] = sequence[:positions[0]] + infos["code_guichet"] = sequence[positions[0]+2:positions[1]] + infos["numero_compte"] = sequence[positions[1]+2:positions[2]] + infos["cle_rib"] = sequence[positions[2]+2:positions[3]] + infos["numero_cheque"] = sequence[positions[3]+2:positions[4]] + else: + # Découpage approximatif basé sur les longueurs typiques + # Code banque: 5 chiffres, Code guichet: 5 chiffres, N° compte: 11 chiffres, Clé RIB: 2 chiffres, N° chèque: 7 chiffres + s = sequence.replace("S1", "").replace("S2", "").replace("S3", "").replace("S4", "").replace("S5", "") + if len(s) >= 30: + infos["code_banque"] = s[:5] + infos["code_guichet"] = s[5:10] + infos["numero_compte"] = s[10:21] + infos["cle_rib"] = s[21:23] + infos["numero_cheque"] = s[23:30] if len(s) >= 30 else s[23:] + + except Exception as e: + logger.error(f"Erreur lors de l'extraction des infos MICR: {e}") + + # Mise à jour du numéro de chèque dans le format standard + if infos["numero_cheque"]: + infos["numero_cheque"] = infos["numero_cheque"].strip() + + return infos + + +def extraire_montant_en_lettres(chemin_image: str) -> Optional[str]: + """ + Extraction spécialisée du montant en lettres sur un chèque, + optimisée pour le texte manuscrit et les formulations françaises. + + Args: + chemin_image: Chemin vers l'image du chèque + + Returns: + Le montant en lettres extrait ou None si non détecté + """ + logger.info(f"Extraction du montant en lettres pour: {chemin_image}") + + try: + # Chargement de l'image + img = cv2.imread(chemin_image) + if img is None: + raise ValueError(f"Impossible de charger l'image: {chemin_image}") + + # 1. Localisation de la zone du montant en lettres + zone_montant_lettres = localiser_zone_montant_lettres(img) + + # Sauvegarder la zone pour diagnostic + debug_path = f"debug_montant_lettres_{os.path.basename(chemin_image)}" + cv2.imwrite(debug_path, zone_montant_lettres) + logger.info(f"Zone montant en lettres sauvegardée: {debug_path}") + + # 2. Prétraitement spécifique pour l'écriture manuscrite + image_preprocessed = pretraiter_ecriture_manuscrite(zone_montant_lettres) + + # Vérifier que l'image prétraitée n'est pas vide avant de la sauvegarder + if image_preprocessed is not None and not image_preprocessed.size == 0: + # Sauvegarder l'image prétraitée + debug_preproc_path = f"debug_montant_preproc_{os.path.basename(chemin_image)}" + try: + cv2.imwrite(debug_preproc_path, image_preprocessed) + logger.info(f"Image prétraitée sauvegardée: {debug_preproc_path}") + except Exception as e: + logger.warning(f"Impossible de sauvegarder l'image prétraitée: {e}") + else: + logger.warning("L'image prétraitée est vide, impossible de la sauvegarder") + + # 3. OCR avec optimisations pour le texte manuscrit + texte_brut = ocr_optimise_manuscrit(image_preprocessed) + logger.info(f"Texte OCR brut: {texte_brut}") + + # 4. Extraction du montant à partir du texte + montant_lettres = extraire_montant_depuis_texte(texte_brut) + + # 5. Validation linguistique du montant + montant_valide = valider_montant_lettres(montant_lettres) + logger.info(f"Montant en lettres extrait: {montant_valide}") + + return montant_valide + + except Exception as e: + logger.error(f"Erreur lors de l'extraction du montant en lettres: {e}") + return None + + +def localiser_zone_montant_lettres(img): + """ + Localise la zone contenant le montant écrit en lettres sur un chèque français. + Utilise une combinaison d'heuristiques de position et de détection de lignes. + """ + height, width = img.shape[:2] + + # La zone du montant en lettres est généralement dans le tiers supérieur du chèque + # après la zone du bénéficiaire + upper_mid = img[int(height*0.15):int(height*0.45), 0:width] + + # Conversion en niveaux de gris + gray = cv2.cvtColor(upper_mid, cv2.COLOR_BGR2GRAY) if len(upper_mid.shape) > 2 else upper_mid + + # Amélioration du contraste pour faire ressortir l'écriture + clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) + enhanced = clahe.apply(gray) + + # Détection des lignes horizontales (lignes d'écriture) + edges = cv2.Canny(enhanced, 50, 150, apertureSize=3) + lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100, minLineLength=width*0.3, maxLineGap=20) + + # Filtrer et trier les lignes par position verticale + horizontal_lines = [] + if lines is not None: + for line in lines: + x1, y1, x2, y2 = line[0] + # Calculer l'angle pour ne garder que les lignes presque horizontales + angle = np.abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) + if angle < 5 or angle > 175: # Presque horizontal + horizontal_lines.append((min(y1, y2), max(y1, y2))) + + # Si des lignes sont détectées, prendre celle qui correspond probablement + # à la ligne du montant en lettres (souvent la 2ème ou 3ème ligne horizontale) + if len(horizontal_lines) >= 3: + # Trier les lignes par position verticale + horizontal_lines.sort(key=lambda l: l[0]) + + # Prendre la ligne qui correspond généralement au montant en lettres + # (après la ligne du bénéficiaire) + line_idx = 1 # Souvent la deuxième ligne + y_min, y_max = horizontal_lines[line_idx] + + # Ajouter une marge + margin = int((y_max - y_min) * 1.0) + y_start = max(0, y_min - margin) + y_end = min(upper_mid.shape[0], y_max + margin) + + # S'assurer que y_end > y_start pour éviter une image vide + if y_end <= y_start: + y_end = min(y_start + 50, upper_mid.shape[0]) + + # Extraire la région + zone_montant = upper_mid[y_start:y_end, 0:width] + else: + # Fallback: utiliser une position heuristique si les lignes ne sont pas détectées + # Le montant en lettres est souvent entre 25% et 35% de la hauteur du chèque + y_start = int(upper_mid.shape[0] * 0.3) + y_end = int(upper_mid.shape[0] * 0.6) + + # S'assurer que y_end > y_start pour éviter une image vide + if y_end <= y_start: + y_end = min(y_start + 50, upper_mid.shape[0]) + + zone_montant = upper_mid[y_start:y_end, 0:width] + + # Vérifier que la zone extraite n'est pas vide + if zone_montant.size == 0 or zone_montant.shape[0] == 0 or zone_montant.shape[1] == 0: + logger.warning("Zone du montant en lettres vide, utilisation d'une zone par défaut") + # Utiliser une zone par défaut si la zone extraite est vide + y_start = int(height * 0.2) + y_end = int(height * 0.3) + zone_montant = img[y_start:y_end, 0:width] + + logger.info(f"Dimensions de la zone du montant en lettres: {zone_montant.shape}") + + return zone_montant + + +def pretraiter_ecriture_manuscrite(image): + """ + Prétraitement optimisé pour l'écriture manuscrite sur les chèques + """ + # Vérifier que l'image n'est pas vide + if image is None or image.size == 0 or image.shape[0] == 0 or image.shape[1] == 0: + logger.warning("Image vide ou invalide fournie pour le prétraitement manuscrit") + # Renvoyer une image noire minimale (1x1) pour éviter les erreurs + return np.zeros((100, 100), dtype=np.uint8) + + try: + # Vérifier que l'image a la bonne dimension et le bon type + if len(image.shape) < 2: + logger.warning(f"Image de forme invalide pour le prétraitement: {image.shape}") + return np.zeros((100, 100), dtype=np.uint8) + + # Conversion en niveaux de gris + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) > 2 else image + + # Amélioration du contraste local pour faire ressortir l'écriture + clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8)) + enhanced = clahe.apply(gray) + + # Réduction du bruit tout en préservant les bords + denoised = cv2.bilateralFilter(enhanced, 9, 75, 75) + + # Binarisation adaptative optimisée pour l'écriture manuscrite + # Utiliser un seuil plus bas pour capturer les traits fins + binary = cv2.adaptiveThreshold(denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY_INV, 15, 4) + + # Opérations morphologiques pour nettoyer et connecter les traits + kernel = np.ones((2, 2), np.uint8) + cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel) + + # Inversion pour avoir le texte en noir sur fond blanc (format attendu par Tesseract) + cleaned_inv = cv2.bitwise_not(cleaned) + + # Augmenter l'échelle de l'image (souvent aide l'OCR pour le manuscrit) + scaled = cv2.resize(cleaned_inv, None, fx=1.5, fy=1.5, interpolation=cv2.INTER_CUBIC) + + return scaled + + except Exception as e: + logger.error(f"Erreur lors du prétraitement de l'écriture manuscrite: {e}") + # Renvoyer une image noire minimale en cas d'erreur + return np.zeros((100, 100), dtype=np.uint8) + + +def ocr_optimise_manuscrit(image_preprocessed): + """ + OCR spécifiquement configuré pour le texte manuscrit + Utilise Tesseract avec des paramètres optimisés + """ + # Vérifier que l'image n'est pas vide ou invalide + if image_preprocessed is None: + logger.warning("Image vide fournie à l'OCR manuscrit") + return "" + + # Vérifier les dimensions de l'image + try: + if len(image_preprocessed.shape) < 2 or image_preprocessed.size == 0: + logger.warning(f"Image de forme invalide pour l'OCR: {image_preprocessed.shape if hasattr(image_preprocessed, 'shape') else 'sans forme'}") + return "" + except Exception as e: + logger.error(f"Erreur lors de la vérification de l'image pour OCR: {e}") + return "" + + try: + import pytesseract + + # Configuration spécifique pour l'écriture manuscrite + # --psm 6: Suppose un seul bloc de texte uniforme + # --oem 1: Utiliser le moteur LSTM pour de meilleurs résultats sur l'écriture manuscrite + custom_config = r'--oem 1 --psm 6 -l fra' + + # Ajouter des mots spécifiques au contexte des montants + # pour aider la reconnaissance + custom_config += r' -c tessedit_char_whitelist="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789éèêëàâäôöùûüç -"' + + # Essayer de sauvegarder l'image pour debug avant OCR + try: + cv2.imwrite("debug_pre_ocr.png", image_preprocessed) + logger.info("Image pré-OCR sauvegardée pour debug") + except Exception as e: + logger.warning(f"Impossible de sauvegarder l'image pré-OCR: {e}") + + # Effectuer l'OCR + texte = pytesseract.image_to_string(image_preprocessed, config=custom_config) + + if not texte or texte.isspace(): + logger.warning("OCR n'a extrait aucun texte") + # Essayer avec une configuration alternative + alt_config = r'--oem 0 --psm 3 -l fra' + texte = pytesseract.image_to_string(image_preprocessed, config=alt_config) + logger.info("OCR alternatif utilisé") + + return texte + + except Exception as e: + logger.error(f"Erreur lors de l'OCR manuscrit: {e}") + # Essayer de sauvegarder l'image en cas d'erreur pour diagnostic + try: + cv2.imwrite("debug_manuscrit_error.png", image_preprocessed) + logger.info("Image d'erreur sauvegardée pour diagnostic") + except Exception as e2: + logger.warning(f"Impossible de sauvegarder l'image d'erreur: {e2}") + return "" + + +def extraire_montant_depuis_texte(texte): + """ + Extrait le montant en lettres à partir du texte OCR + Utilise des patterns spécifiques aux formulations sur les chèques français + """ + # Nettoyage du texte + texte = texte.lower().replace('\n', ' ').strip() + + # Patterns communs pour les montants sur les chèques français + patterns = [ + r"payez (?:contre ce chèque )?(?:à l'ordre de|à) .+? (?:la )?somme de (.+?)(?:euros|€|\n|$)", + r"la somme de (.+?)(?:euros|€|\n|$)", + r"montant: (.+?)(?:euros|€|\n|$)", + r"([a-z]+ mille [a-z]+ cent [a-z]+ [a-z]+)", # ex: "deux mille trois cent cinquante euros" + r"([a-z]+ mille [a-z]+ [a-z]+)", # ex: "deux mille cinquante euros" + r"([a-z]+ cent [a-z]+ [a-z]+)", # ex: "trois cent cinquante euros" + ] + + # Essayer chaque pattern + for pattern in patterns: + matches = re.search(pattern, texte) + if matches: + montant_texte = matches.group(1).strip() + return montant_texte + + # Si aucun pattern ne correspond, retourner le texte entier + # pour traitement manuel ou analyse plus poussée + return texte + + +def valider_montant_lettres(montant_texte): + """ + Valide et normalise le montant extrait en lettres + Corrige les erreurs courantes d'OCR et normalise la formulation + """ + if not montant_texte: + return None + + # Corrections courantes des erreurs d'OCR + corrections = { + "dcux": "deux", "trais": "trois", "quatrc": "quatre", + "cinq": "cinq", "s1x": "six", "scpt": "sept", + "hu1t": "huit", "neut": "neuf", "d1x": "dix", + "vingr": "vingt", "trenre": "trente", "quarantc": "quarante", + "cinquantc": "cinquante", "soixantc": "soixante", + "miIIe": "mille", "milIe": "mille", + "ccnt": "cent", "cen+": "cent" + } + + # Appliquer les corrections + for erreur, correction in corrections.items(): + montant_texte = montant_texte.replace(erreur, correction) + + # Supprimer les termes non pertinents + non_pertinents = ["euros", "euro", "€", "francs", "exactement", "et"] + for terme in non_pertinents: + montant_texte = montant_texte.replace(terme, "") + + # Normaliser les espaces + montant_texte = " ".join(montant_texte.split()) + + return montant_texte + + +def convertir_texte_en_nombre(texte_montant): + """ + Convertit un montant écrit en lettres en valeur numérique + Gère les spécificités de la langue française + """ + # Dictionnaire de conversion français + valeurs = { + "zéro": 0, "zero": 0, + "un": 1, "une": 1, + "deux": 2, + "trois": 3, + "quatre": 4, + "cinq": 5, + "six": 6, + "sept": 7, + "huit": 8, + "neuf": 9, + "dix": 10, + "onze": 11, + "douze": 12, + "treize": 13, + "quatorze": 14, + "quinze": 15, + "seize": 16, + "dix-sept": 17, "dix sept": 17, + "dix-huit": 18, "dix huit": 18, + "dix-neuf": 19, "dix neuf": 19, + "vingt": 20, + "trente": 30, + "quarante": 40, + "cinquante": 50, + "soixante": 60, + "soixante-dix": 70, "soixante dix": 70, + "quatre-vingt": 80, "quatre vingt": 80, + "quatre-vingt-dix": 90, "quatre vingt dix": 90, + "cent": 100, + "mille": 1000, + "million": 1000000, + "millions": 1000000, + "milliard": 1000000000, + "milliards": 1000000000 + } + + if not texte_montant: + return None + + try: + # Prétraitement du texte + texte = texte_montant.lower().replace('-', ' ').replace(' ', ' ').strip() + texte = texte.replace('euros', '').replace('euro', '').replace('€', '').strip() + + # Gestion des décimales + parties = texte.split('virgule') + partie_entiere = parties[0].strip() + partie_decimale = parties[1].strip() if len(parties) > 1 else None + + # Analyseur syntaxique simplifié pour la partie entière + mots = partie_entiere.split() + total = 0 + sous_total = 0 + + for mot in mots: + if mot in valeurs: + valeur = valeurs[mot] + + if valeur in [100, 1000, 1000000, 1000000000]: + # Multiplicateurs + if sous_total == 0: + sous_total = 1 + sous_total *= valeur + + if valeur >= 1000: + total += sous_total + sous_total = 0 + else: + # Valeurs simples + sous_total += valeur + + total += sous_total + + # Traiter la partie décimale si présente + if partie_decimale: + decimale = 0 + mots_decimale = partie_decimale.split() + + # Cas spécial pour "cinquante centimes", etc. + if "centime" in partie_decimale or "centimes" in partie_decimale: + for mot in mots_decimale: + if mot in valeurs and mot not in ["centime", "centimes"]: + decimale = valeurs[mot] + return total + (decimale / 100) + + # Sinon, conversion normale + for mot in mots_decimale: + if mot in valeurs: + decimale = decimale * 10 + valeurs[mot] + + # Ajuster au format décimal (max 99 centimes) + if decimale > 99: + decimale = 99 + + return total + (decimale / 100) + + return total + + except Exception as e: + logger.error(f"Erreur lors de la conversion du montant en lettres: {e}") + return None + + +def verifier_coherence_montants(montant_chiffres, montant_lettres): + """ + Vérifie la cohérence entre le montant en chiffres et en lettres + Retourne un score de confiance et des détails sur la comparaison + """ + # Convertir le montant en chiffres en nombre + if isinstance(montant_chiffres, str): + # Nettoyer la chaîne (enlever symboles, espaces) + montant_chiffres = montant_chiffres.replace('€', '').replace(' ', '').strip() + # Remplacer la virgule par un point pour la conversion + montant_chiffres = montant_chiffres.replace(',', '.') + try: + montant_chiffres_num = float(montant_chiffres) + except ValueError: + return {"coherent": False, "raison": "Montant en chiffres non numérique", "confiance": 0} + else: + montant_chiffres_num = float(montant_chiffres) + + # Convertir le montant en lettres en nombre + montant_lettres_num = convertir_texte_en_nombre(montant_lettres) + + if montant_lettres_num is None: + return {"coherent": False, "raison": "Montant en lettres non convertible", "confiance": 0} + + # Calculer la différence relative + if montant_chiffres_num == 0 and montant_lettres_num == 0: + diff_relative = 0 + else: + diff_relative = abs(montant_chiffres_num - montant_lettres_num) / max(montant_chiffres_num, montant_lettres_num) + + # Déterminer la cohérence + if diff_relative == 0: + return {"coherent": True, "raison": "Montants identiques", "confiance": 1.0} + elif diff_relative < 0.01: + return {"coherent": True, "raison": "Montants quasi-identiques", "confiance": 0.95} + elif diff_relative < 0.1: + return {"coherent": False, "raison": "Montants légèrement différents", "confiance": 0.5} + else: + return {"coherent": False, "raison": "Montants incohérents", "confiance": 0} # Fonctions d'extraction pour des champs spécifiques def extraire_montant(texte: str) -> Optional[str]: """Extrait le montant à partir du texte OCR""" - # Recherche de motifs comme "€ 123,45" ou "123,45 €" ou "123 €" + # Recherche de motifs comme "€ 123,45" ou "123,45 €" ou "123 €" ou "123,45" ou "123 euros" motifs = [ - r'(\d+[.,]\d{2})\s*€', # 123,45 € - r'€\s*(\d+[.,]\d{2})', # € 123,45 - r'(\d+)\s*€' # 123 € + r'(\d+[.,]\d{2})\s*€', # 123,45 € + r'€\s*(\d+[.,]\d{2})', # € 123,45 + r'(\d+)\s*€', # 123 € + r'(\d+[.,]\d{2})', # 123,45 (sans symbole) + r'(\d+[.,]\d{2})\s*[Ee][Uu][Rr]', # 123,45 EUR + r'(\d+[.,]\d{2})\s*euros?', # 123,45 euros + r'(\d+)\s*euros?' # 123 euros ] for motif in motifs: @@ -216,9 +1361,17 @@ def extraire_date(texte: str) -> Optional[str]: """Extrait la date à partir du texte OCR""" # Recherche de dates au format JJ/MM/AAAA ou JJ-MM-AAAA - match = re.search(r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', texte) - if match: - return match.group(1) + # Plusieurs formats de dates possibles + formats_date = [ + r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', # 01/01/2023 ou 01-01-2023 + r'(\d{1,2}\s+[a-zéûôA-Z]+\s+\d{2,4})', # 01 janvier 2023 + r'([a-zéûôA-Z]+\s+\d{1,2}\s+\d{2,4})' # janvier 01 2023 + ] + + for format_date in formats_date: + match = re.search(format_date, texte, re.IGNORECASE) + if match: + return match.group(1) logger.warning("Aucune date détectée dans le texte") return None @@ -229,10 +1382,17 @@ def extraire_beneficiaire(texte: str) -> Optional[str]: # Cette fonction est plus complexe car elle dépend du format du chèque # Recherche souvent après "Payez contre ce chèque à" ou similaire - match = re.search(r'(?:Payez|Payé)(?:\s+contre\s+ce\s+chèque)?\s+(?:à|au?)\s+([A-Z\s]+)', - texte, re.IGNORECASE) - if match: - return match.group(1).strip() + # Plusieurs formulations possibles pour le bénéficiaire + motifs = [ + r'(?:Payez|Payé)(?:\s+contre\s+ce\s+chèque)?\s+(?:à|au?)\s+([A-Z0-9\s\.,-]+)', + r'(?:à l\'ordre de|ordre)(?:\s+:)?\s+([A-Z0-9\s\.,-]+)', + r'(?:bénéficiaire|beneficiaire)(?:\s+:)?\s+([A-Z0-9\s\.,-]+)' + ] + + for motif in motifs: + match = re.search(motif, texte, re.IGNORECASE) + if match: + return match.group(1).strip() logger.warning("Aucun bénéficiaire détecté dans le texte") return None @@ -242,34 +1402,88 @@ def extraire_numero_cheque(texte: str) -> Optional[str]: """Extrait le numéro de chèque à partir du texte OCR""" # Recherche d'un numéro de chèque (généralement 7 chiffres) - match = re.search(r'N°\s*(\d{7})', texte) - if match: - return match.group(1) + # Recherche de formats de numéro de chèque + motifs = [ + r'N[o°]\s*(\d{7})', # N° 1234567 + r'(?:chèque|cheque)\s*(?:n[o°]?)?\s*(\d{7})', # chèque n° 1234567 + r'(? str: - """Évalue la qualité de l'extraction basée sur les champs extraits""" + """ + Évalue la qualité de l'extraction basée sur les champs extraits + Prend en compte les informations MICR et le montant en lettres + """ + # Définir les champs importants par catégorie + champs_classiques = ["montant", "date", "beneficiaire", "numero_cheque"] + champs_micr = ["code_banque", "code_guichet", "numero_compte", "cle_rib"] + champs_avances = ["montant_lettres", "coherence_montants"] - # Compter le nombre de champs extraits avec succès - champs_extraits = sum(1 for v in infos.values() if v is not None) - total_champs = 4 # montant, date, beneficiaire, numero_cheque + # Compter les champs extraits dans chaque catégorie + n_classiques = sum(1 for c in champs_classiques if infos.get(c) is not None) + n_micr = sum(1 for c in champs_micr if infos.get(c) is not None) + n_avances = sum(1 for c in champs_avances if infos.get(c) is not None) - if champs_extraits == 0: - return "échec" - elif champs_extraits < 2: - return "faible" - elif champs_extraits < 4: - return "moyenne" + total_extraits = n_classiques + n_micr + n_avances + methode = infos.get("methode", "") + + # Évaluation pour les méthodes avancées (hybride ou hybride_avance) + if "hybride" in methode: + # Méthode hybride_avance: évaluer sur tous les champs + if methode == "hybride_avance": + max_champs = len(champs_classiques) + len(champs_micr) + len(champs_avances) + if total_extraits == 0: + return "échec" + elif total_extraits < max_champs * 0.3: # Moins de 30% + return "faible" + elif total_extraits < max_champs * 0.7: # Entre 30% et 70% + return "moyenne" + else: # Plus de 70% + return "bonne" + + # Méthode hybride standard: évaluer sur les champs classiques et MICR + else: + max_champs = len(champs_classiques) + len(champs_micr) + total_pertinent = n_classiques + n_micr + if total_pertinent == 0: + return "échec" + elif total_pertinent < max_champs * 0.3: + return "faible" + elif total_pertinent < max_champs * 0.7: + return "moyenne" + else: + return "bonne" + + # Évaluation pour la méthode MICR seule + elif methode == "micr_cmc7": + if n_micr == 0: + return "échec" + elif n_micr < 2: + return "faible" + elif n_micr < 4: + return "moyenne" + else: + return "bonne" + + # Évaluation pour les méthodes classiques (ocr ou cv) else: - return "bonne" + if n_classiques == 0: + return "échec" + elif n_classiques < 2: + return "faible" + elif n_classiques < 4: + return "moyenne" + else: + return "bonne" def get_tessdata_path() -> Optional[str]: diff --git a/worker/app/tasks.py b/worker/app/tasks.py index 957bfb5..5d7ffd5 100644 --- a/worker/app/tasks.py +++ b/worker/app/tasks.py @@ -11,6 +11,7 @@ import traceback import redis from datetime import datetime from rq import get_current_job +from json import JSONEncoder # Ajouter le module d'extraction au path sys.path.append('/app/shared') @@ -56,7 +57,8 @@ def update_job_status(job_id, status, message=None, progress=None, result=None, update_data["progress"] = str(progress) if result: - update_data["result"] = str(result) + # Utiliser json.dumps pour une sérialisation correcte + update_data["result"] = json.dumps(result) if texte_brut: update_data["texte_brut"] = texte_brut @@ -82,13 +84,14 @@ def update_job_status(job_id, status, message=None, progress=None, result=None, return False -def process_cheque_image(file_path, job_id): +def process_cheque_image(job_id, file_path, **kwargs): """ Traite une image de chèque pour en extraire les informations Args: - file_path (str): Chemin vers l'image à traiter job_id (str): Identifiant de la tâche + file_path (str): Chemin vers l'image à traiter + **kwargs: Paramètres supplémentaires (ignorés, pour compatibilité avec RQ) Returns: dict: Résultat de l'extraction @@ -135,12 +138,12 @@ def process_cheque_image(file_path, job_id): infos, texte = extraire_infos_cheque( chemin_image=file_path, - methode="ocr", + methode="hybride_avance", language=DEFAULT_OCR_LANGUAGE, tessdata=tessdata_path ) - methode = "ocr" + methode = "hybride_avance" except Exception as e: logger.warning(f"Échec de la première tentative: {str(e)}") @@ -157,31 +160,50 @@ def process_cheque_image(file_path, job_id): infos, texte = extraire_infos_cheque( chemin_image=file_path, - methode="ocr", + methode="hybride_avance", language=ALTERNATIVE_OCR_LANGUAGE, tessdata=tessdata_path ) - methode = "ocr" + methode = "hybride_avance" except Exception as e2: logger.warning(f"Échec de la deuxième tentative: {str(e2)}") - # Troisième tentative avec la méthode CV - logger.info("Tentative d'extraction avec la méthode CV (sans OCR)") + # Troisième tentative avec la méthode hybride (combinant MICR et extraction du montant en lettres) + logger.info("Tentative d'extraction avec la méthode hybride (MICR + montant en lettres)") update_job_status( job_id=job_id, status="processing", - message="Extraction par détection de zones (sans OCR)", + message="Extraction MICR et montant en lettres", progress=70 ) infos, texte = extraire_infos_cheque( chemin_image=file_path, - methode="cv" + methode="hybride", + language=DEFAULT_OCR_LANGUAGE, + tessdata=tessdata_path ) - methode = "cv" + methode = "hybride" + + # Si échec, dernière tentative avec la méthode MICR seule + if not infos.get("montant") and not infos.get("code_banque"): + logger.info("Tentative d'extraction avec la méthode MICR uniquement") + update_job_status( + job_id=job_id, + status="processing", + message="Extraction MICR uniquement", + progress=80 + ) + + infos, texte = extraire_infos_cheque( + chemin_image=file_path, + methode="micr" + ) + + methode = "micr" # Mise à jour finale update_job_status(