Files
scan-ocr-cheques/api/app/main.py
2025-07-09 06:40:36 +02:00

575 lines
20 KiB
Python

"""
API principale pour le service d'extraction d'informations de chèques
"""
import os
import time
import uuid
import shutil
import json
from typing import List, Optional
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header, BackgroundTasks, Query, Request, Path
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from datetime import datetime, timedelta
from pathlib import Path as FilePath
import redis
from rq import Queue
from rq.job import Job
from rq.registry import StartedJobRegistry, FinishedJobRegistry, FailedJobRegistry
import logging
from .config import settings
from .schemas import (
UploadResponse, JobStatusResponse, JobResult,
ExtractionResult, HealthCheck, ErrorResponse, JobStatus,
ConversionResponse, ConversionStatus
)
from .conversion import is_modern_format, convert_file, get_supported_input_formats
from .dependencies import verify_api_key, get_redis_connection
from .tasks import process_cheque_image
# Configuration du logging
logging.basicConfig(
level=logging.INFO if not settings.DEBUG else logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("cheque_scanner_api")
# Création de l'application FastAPI
app = FastAPI(
title=settings.APP_NAME,
description="API pour l'extraction d'informations de chèques à partir d'images",
version=settings.API_VERSION,
docs_url=f"{settings.API_PREFIX}/docs",
redoc_url=f"{settings.API_PREFIX}/redoc",
openapi_url=f"{settings.API_PREFIX}/openapi.json"
)
# Middleware CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # À ajuster en production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configuration des fichiers statiques pour accéder aux images de résultats et fichiers convertis
app.mount("/static", StaticFiles(directory=settings.RESULT_FOLDER), name="static")
app.mount("/static/converted", StaticFiles(directory=settings.CONVERTED_FOLDER), name="converted_files")
# Variable pour stocker le temps de démarrage
start_time = time.time()
@app.get(f"{settings.API_PREFIX}/health", response_model=HealthCheck, tags=["Système"])
async def health_check(redis_conn: redis.Redis = Depends(get_redis_connection)):
"""
Vérifie l'état de santé de l'API et des services associés
"""
# Vérifier la connexion Redis
try:
redis_conn.ping()
redis_status = "ok"
except Exception as e:
logger.error(f"Erreur Redis: {str(e)}")
redis_status = f"error: {str(e)}"
# Obtenir le nombre de workers actifs
registry = StartedJobRegistry(settings.QUEUE_NAME, connection=redis_conn)
worker_count = len(registry.get_job_ids())
# Obtenir la taille de la file d'attente
queue = Queue(settings.QUEUE_NAME, connection=redis_conn)
queue_size = len(queue.get_job_ids())
# Calculer le temps de fonctionnement
uptime_seconds = time.time() - start_time
days, remainder = divmod(uptime_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
uptime = f"{int(days)}d {int(hours)}h {int(minutes)}m {int(seconds)}s"
return HealthCheck(
status="ok",
version=settings.API_VERSION,
redis_status=redis_status,
worker_count=worker_count,
queue_size=queue_size,
uptime=uptime
)
@app.post(
f"{settings.API_PREFIX}/upload",
response_model=UploadResponse,
tags=["Extraction"],
status_code=202
)
async def upload_image(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
priority: bool = Query(False, description="Traiter avec une priorité élevée"),
api_key: str = Depends(verify_api_key),
redis_conn: redis.Redis = Depends(get_redis_connection)
):
"""
Télécharge une image de chèque et lance son traitement
"""
# Vérifier l'extension du fichier
file_ext = os.path.splitext(file.filename)[1].lower().lstrip(".")
if file_ext not in settings.ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Format de fichier non pris en charge. Formats acceptés: {', '.join(settings.ALLOWED_EXTENSIONS)}"
)
# Créer un identifiant unique pour la tâche
job_id = str(uuid.uuid4())
# Créer le chemin de fichier
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{job_id}.{file_ext}"
file_path = os.path.join(settings.UPLOAD_FOLDER, filename)
# Sauvegarder le fichier
try:
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du fichier: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la sauvegarde de l'image: {str(e)}"
)
# Déterminer la file d'attente en fonction de la priorité
queue_name = settings.HIGH_PRIORITY_QUEUE_NAME if priority else settings.QUEUE_NAME
queue = Queue(queue_name, connection=redis_conn)
# Créer une tâche RQ
try:
# Utiliser directement le nom complet du module dans le worker
queue_job = queue.enqueue(
'tasks.process_cheque_image',
job_id,
file_path,
result_ttl=settings.RESULT_TTL,
timeout=settings.JOB_TIMEOUT
)
# Stocker les métadonnées dans Redis
redis_conn.hset(f"job:{job_id}", mapping={
"status": JobStatus.PENDING.value,
"created_at": datetime.now().isoformat(),
"file_path": file_path,
"filename": file.filename,
"priority": str(priority).lower()
})
logger.info(f"Tâche créée: {job_id} - Fichier: {file.filename}")
return UploadResponse(
job_id=job_id,
status=JobStatus.PENDING,
message=f"Image en file d'attente pour traitement (file {'prioritaire' if priority else 'standard'})"
)
except Exception as e:
logger.error(f"Erreur lors de la création de la tâche: {str(e)}")
# Supprimer le fichier en cas d'erreur
if os.path.exists(file_path):
os.remove(file_path)
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la création de la tâche: {str(e)}"
)
@app.post(
f"{settings.API_PREFIX}/convert",
response_model=ConversionResponse,
tags=["Conversion"],
summary="Convertit un fichier d'un format moderne vers un format standard"
)
async def convert_image(
file: UploadFile = File(...),
target_format: Optional[str] = Query(None, description="Format cible (jpg, png)"),
api_key: str = Depends(verify_api_key)
):
"""
Convertit une image d'un format moderne (HEIC, WebP, RAW, etc.) vers un format standard (JPG, PNG)
Cette API prend en charge les formats suivants:
- Formats iPhone: HEIC, HEIF
- Formats Android haute qualité: WebP, DNG
- Formats RAW: CR2, ARW, NEF, RAW
La conversion est effectuée côté serveur et le fichier converti est renvoyé.
"""
# Vérifier si le format d'entrée nécessite une conversion
filename = file.filename
file_ext = FilePath(filename).suffix.lower().lstrip(".")
# Si le format est déjà pris en charge, renvoyer une erreur
if file_ext in settings.ALLOWED_EXTENSIONS and not target_format:
return ConversionResponse(
original_filename=filename,
original_format=file_ext,
status=ConversionStatus.FAILED,
message="Le fichier est déjà dans un format pris en charge, aucune conversion nécessaire"
)
# Si le format n'est pas pris en charge du tout
if file_ext not in settings.ALLOWED_EXTENSIONS and file_ext not in settings.MODERN_EXTENSIONS:
return ConversionResponse(
original_filename=filename,
original_format=file_ext,
status=ConversionStatus.FAILED,
message=f"Format non pris en charge: {file_ext}. Formats supportés: {', '.join(get_supported_input_formats())}"
)
# Sauvegarder le fichier temporairement
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_filename = f"{timestamp}_{FilePath(filename).stem}.{file_ext}"
temp_path = FilePath(settings.TEMP_FOLDER) / temp_filename
try:
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du fichier temporaire: {str(e)}")
return ConversionResponse(
original_filename=filename,
original_format=file_ext,
status=ConversionStatus.FAILED,
message=f"Erreur lors de la sauvegarde du fichier: {str(e)}"
)
# Déterminer le format de sortie si spécifié
output_format = target_format if target_format else None
# Effectuer la conversion
success, converted_path, error_message = convert_file(
str(temp_path),
settings.CONVERTED_FOLDER
)
# Nettoyer le fichier temporaire
if FilePath(temp_path).exists():
FilePath(temp_path).unlink()
if not success:
return ConversionResponse(
original_filename=filename,
original_format=file_ext,
status=ConversionStatus.FAILED,
message=error_message if error_message else "Échec de la conversion pour une raison inconnue"
)
# Préparer le chemin relatif pour l'URL
converted_filename = FilePath(converted_path).name
converted_format = FilePath(converted_path).suffix.lower().lstrip(".")
# Construire l'URL pour accéder au fichier converti
converted_url = f"/static/converted/{converted_filename}"
return ConversionResponse(
original_filename=filename,
original_format=file_ext,
converted_filename=converted_filename,
converted_format=converted_format,
converted_path=converted_url,
status=ConversionStatus.SUCCESS,
message=f"Conversion réussie de {file_ext} vers {converted_format}"
)
@app.post(
f"{settings.API_PREFIX}/convert-and-process",
response_model=UploadResponse,
tags=["Conversion"],
status_code=202,
summary="Convertit un fichier et lance son traitement"
)
async def convert_and_process_image(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
priority: bool = Query(False, description="Traiter avec une priorité élevée"),
api_key: str = Depends(verify_api_key),
redis_conn: redis.Redis = Depends(get_redis_connection)
):
"""
Convertit une image d'un format moderne vers un format standard, puis lance son traitement d'extraction
Cette API combine la conversion et le traitement en une seule étape pour les fichiers provenant
d'appareils modernes (iPhone, Android haut de gamme).
"""
# Vérifier le format du fichier
filename = file.filename
file_ext = FilePath(filename).suffix.lower().lstrip(".")
# Créer un identifiant unique pour la tâche
job_id = str(uuid.uuid4())
# Si le format est déjà pris en charge, procéder directement au traitement
if file_ext in settings.ALLOWED_EXTENSIONS:
# Créer le chemin de fichier
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
new_filename = f"{timestamp}_{job_id}.{file_ext}"
file_path = os.path.join(settings.UPLOAD_FOLDER, new_filename)
# Sauvegarder le fichier
try:
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du fichier: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la sauvegarde de l'image: {str(e)}"
)
# Si le format nécessite une conversion
elif file_ext in settings.MODERN_EXTENSIONS:
# Sauvegarder le fichier temporairement
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_filename = f"{timestamp}_{job_id}.{file_ext}"
temp_path = FilePath(settings.TEMP_FOLDER) / temp_filename
try:
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du fichier temporaire: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la sauvegarde de l'image: {str(e)}"
)
# Effectuer la conversion
success, converted_path, error_message = convert_file(
str(temp_path),
settings.UPLOAD_FOLDER
)
# Nettoyer le fichier temporaire
if FilePath(temp_path).exists():
FilePath(temp_path).unlink()
if not success:
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la conversion: {error_message if error_message else 'Raison inconnue'}"
)
# Utiliser le fichier converti
file_path = converted_path
else:
# Format non pris en charge
raise HTTPException(
status_code=400,
detail=f"Format de fichier non pris en charge. Formats acceptés: " +
f"{', '.join(settings.ALLOWED_EXTENSIONS)} et {', '.join(settings.MODERN_EXTENSIONS)}"
)
# Déterminer la file d'attente en fonction de la priorité
queue_name = settings.HIGH_PRIORITY_QUEUE_NAME if priority else settings.QUEUE_NAME
queue = Queue(queue_name, connection=redis_conn)
# Créer une tâche RQ
try:
# Utiliser directement le nom complet du module dans le worker
queue_job = queue.enqueue(
'tasks.process_cheque_image',
job_id,
file_path,
result_ttl=settings.RESULT_TTL,
timeout=settings.JOB_TIMEOUT
)
# Stocker les métadonnées dans Redis
redis_conn.hset(f"job:{job_id}", mapping={
"status": JobStatus.PENDING.value,
"created_at": datetime.now().isoformat(),
"file_path": file_path,
"filename": filename,
"priority": str(priority).lower(),
"converted": "true" if file_ext in settings.MODERN_EXTENSIONS else "false"
})
logger.info(f"Tâche créée: {job_id} - Fichier: {filename} (converti: {file_ext in settings.MODERN_EXTENSIONS})")
return UploadResponse(
job_id=job_id,
status=JobStatus.PENDING,
message=f"Image {'convertie et ' if file_ext in settings.MODERN_EXTENSIONS else ''}en file d'attente pour traitement (file {'prioritaire' if priority else 'standard'})"
)
except Exception as e:
logger.error(f"Erreur lors de la création de la tâche: {str(e)}")
# Supprimer le fichier en cas d'erreur
if os.path.exists(file_path):
os.remove(file_path)
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la création de la tâche: {str(e)}"
)
@app.get(
f"{settings.API_PREFIX}/supported-formats",
response_model=List[str],
tags=["Conversion"],
summary="Liste tous les formats de fichiers pris en charge"
)
async def list_supported_formats(
api_key: str = Depends(verify_api_key)
):
"""
Renvoie la liste de tous les formats de fichiers pris en charge par l'API
Cela inclut à la fois les formats standards et les formats modernes qui nécessitent une conversion.
"""
return get_supported_input_formats()
@app.get(
f"{settings.API_PREFIX}/status/{{job_id}}",
response_model=JobStatusResponse,
tags=["Extraction"]
)
async def get_job_status(
job_id: str,
api_key: str = Depends(verify_api_key),
redis_conn: redis.Redis = Depends(get_redis_connection)
):
"""
Vérifie l'état d'une tâche d'extraction
"""
# Vérifier si la tâche existe
if not redis_conn.exists(f"job:{job_id}"):
raise HTTPException(
status_code=404,
detail=f"Tâche non trouvée: {job_id}"
)
# Récupérer les métadonnées de la tâche
job_data = {k.decode(): v.decode() for k, v in redis_conn.hgetall(f"job:{job_id}").items()}
# Récupérer l'état de la tâche RQ
queue_name = settings.HIGH_PRIORITY_QUEUE_NAME if job_data.get("priority") == "true" else settings.QUEUE_NAME
queue = Queue(queue_name, connection=redis_conn)
status = job_data.get("status", JobStatus.PENDING.value)
message = job_data.get("message", "En attente de traitement")
# Vérifier si la tâche est dans différentes files d'attente
progress = None
queue_position = None
# Si la tâche est en attente, déterminer sa position dans la file
if status == JobStatus.PENDING.value:
job_ids = queue.get_job_ids()
if job_id in job_ids:
queue_position = job_ids.index(job_id) + 1
# Si la tâche est en cours, récupérer la progression
elif status == JobStatus.PROCESSING.value:
progress = job_data.get("progress")
if progress:
progress = int(progress)
return JobStatusResponse(
job_id=job_id,
status=JobStatus(status),
message=message,
created_at=datetime.fromisoformat(job_data.get("created_at")),
updated_at=datetime.fromisoformat(job_data.get("updated_at")) if "updated_at" in job_data else None,
progress=progress,
queue_position=queue_position
)
@app.get(
f"{settings.API_PREFIX}/result/{{job_id}}",
response_model=JobResult,
tags=["Extraction"]
)
async def get_job_result(
job_id: str,
api_key: str = Depends(verify_api_key),
redis_conn: redis.Redis = Depends(get_redis_connection)
):
"""
Récupère les résultats d'une tâche d'extraction terminée
"""
# Vérifier si la tâche existe
if not redis_conn.exists(f"job:{job_id}"):
raise HTTPException(
status_code=404,
detail=f"Tâche non trouvée: {job_id}"
)
# Récupérer les métadonnées de la tâche
job_data = {k.decode(): v.decode() for k, v in redis_conn.hgetall(f"job:{job_id}").items()}
# Vérifier si la tâche est terminée
status = JobStatus(job_data.get("status", JobStatus.PENDING.value))
if status != JobStatus.COMPLETED and status != JobStatus.FAILED:
raise HTTPException(
status_code=400,
detail=f"La tâche n'est pas encore terminée. Statut actuel: {status.value}"
)
# Récupérer les résultats d'extraction si disponibles
result = None
texte_brut = None
if status == JobStatus.COMPLETED:
# Charger les résultats depuis Redis
result_data = job_data.get("result")
if result_data:
result_dict = json.loads(result_data) # Utiliser json.loads au lieu de eval
result = ExtractionResult(**result_dict)
texte_brut = job_data.get("texte_brut")
return JobResult(
job_id=job_id,
status=status,
created_at=datetime.fromisoformat(job_data.get("created_at")),
completed_at=datetime.fromisoformat(job_data.get("completed_at")) if "completed_at" in job_data else None,
image_path=job_data.get("file_path"),
result=result,
texte_brut=texte_brut,
methode=job_data.get("methode", "inconnu"),
erreur=job_data.get("erreur") if status == JobStatus.FAILED else None
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Gestionnaire d'exceptions global"""
logger.error(f"Exception non gérée: {str(exc)}")
return JSONResponse(
status_code=500,
content=ErrorResponse(
message="Erreur interne du serveur",
error_code="INTERNAL_SERVER_ERROR",
details={"error": str(exc)}
).dict()
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.HOST,
port=settings.PORT,
reload=settings.DEBUG
)