diff --git a/.gitignore b/.gitignore index 8a28d1d..5eceadd 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ my.settings.yml jobs.db-* venv312 models -config \ No newline at end of file +config +app.log* \ No newline at end of file diff --git a/main.py b/main.py index d45db65..d617f91 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,5 @@ # main.py (merged) - +import threading import logging import shutil import subprocess @@ -19,7 +19,9 @@ from typing import Dict, List, Any, Optional import resource from threading import Semaphore from logging.handlers import RotatingFileHandler -from urllib.parse import urljoin +from urllib.parse import urljoin, urlparse +from io import BytesIO +import zipfile import sys import re import importlib @@ -28,15 +30,16 @@ import time import ocrmypdf import pypdf import pytesseract -from PIL import Image +from pytesseract import TesseractNotFoundError +from PIL import Image, UnidentifiedImageError from faster_whisper import WhisperModel from fastapi import (Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status, Body) -from fastapi.responses import FileResponse, JSONResponse, RedirectResponse +from fastapi.responses import FileResponse, JSONResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer -from huey import SqliteHuey +from huey import SqliteHuey, crontab from pydantic import BaseModel, ConfigDict, field_serializer from sqlalchemy import (Column, DateTime, Integer, String, Text, create_engine, delete, event) @@ -51,6 +54,8 @@ from authlib.integrations.starlette_client import OAuth from dotenv import load_dotenv from piper import PiperVoice import wave +import io + load_dotenv() @@ -83,6 +88,12 @@ except ImportError: VoiceNotFoundError = None +try: + from PyPDF2 import PdfMerger + _HAS_PYPDF2 = True +except Exception: + _HAS_PYPDF2 = False + # Instantiate OAuth object (was referenced in code) oauth = OAuth() @@ -125,7 +136,7 @@ _model_semaphore = Semaphore(MODEL_CONCURRENCY) # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -_log_handler = RotatingFileHandler("app.log", maxBytes=10*1024*1024, backupCount=5) +_log_handler = RotatingFileHandler("app.log", maxBytes=10*1024*1024, backupCount=1) _log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s') _log_handler.setFormatter(_log_formatter) logging.getLogger().addHandler(_log_handler) @@ -270,6 +281,7 @@ class Job(Base): __tablename__ = "jobs" id = Column(String, primary_key=True, index=True) user_id = Column(String, index=True, nullable=True) + parent_job_id = Column(String, index=True, nullable=True) task_type = Column(String, index=True) status = Column(String, default="pending") progress = Column(Integer, default=0) @@ -294,6 +306,7 @@ def get_db(): class JobCreate(BaseModel): id: str user_id: str | None = None + parent_job_id: str | None = None task_type: str original_filename: str input_filepath: str @@ -303,6 +316,7 @@ class JobCreate(BaseModel): class JobSchema(BaseModel): id: str + parent_job_id: str | None = None task_type: str status: str progress: int @@ -329,12 +343,20 @@ class FinalizeUploadPayload(BaseModel): output_format: str = "" callback_url: Optional[str] = None # For API chunked uploads +class JobSelection(BaseModel): + job_ids: List[str] # -------------------------------------------------------------------------------- # --- 3. CRUD OPERATIONS & WEBHOOKS # -------------------------------------------------------------------------------- + + + + + def get_job(db: Session, job_id: str): - return db.query(Job).filter(Job.id == job_id).first() + # return db.query(Job).filter(Job.id == job_id).first() + return db.query(Job).filter(Job.id == job_id).first() def get_jobs(db: Session, user_id: str | None = None, skip: int = 0, limit: int = 100): query = db.query(Job) @@ -376,6 +398,7 @@ def mark_job_as_completed(db: Session, job_id: str, output_filepath_str: str | N except Exception: logger.exception(f"Could not stat output file {output_filepath_str} for job {job_id}") db.commit() + return db_job def send_webhook_notification(job_id: str, app_config: Dict[str, Any], base_url: str): @@ -440,18 +463,56 @@ PIPER_VOICES_CACHE: Dict[str, "PiperVoice"] = {} AVAILABLE_TTS_VOICES_CACHE: Dict[str, Any] | None = None -def get_whisper_model(model_size: str, whisper_settings: dict) -> WhisperModel: +_model_locks: Dict[str, threading.Lock] = {} +_global_lock = threading.Lock() + +def get_whisper_model(model_size: str, whisper_settings: dict) -> Any: + # Fast path: cache hit without any locking if model_size in WHISPER_MODELS_CACHE: - logger.info(f"Reusing cached model '{model_size}'.") + logger.debug(f"Cache hit for model '{model_size}'") return WHISPER_MODELS_CACHE[model_size] - with _model_semaphore: + + # Prepare for potential load with minimal contention + model_lock = _get_or_create_model_lock(model_size) + + # Critical section: check cache again under model-specific lock + with model_lock: if model_size in WHISPER_MODELS_CACHE: return WHISPER_MODELS_CACHE[model_size] + logger.info(f"Loading Whisper model '{model_size}'...") - model = WhisperModel(model_size, device=whisper_settings.get("device", "cpu"), compute_type=whisper_settings.get('compute_type', 'int8')) - WHISPER_MODELS_CACHE[model_size] = model - logger.info(f"Model '{model_size}' loaded.") - return model + try: + # Optimized initialization with validated settings + device = whisper_settings.get("device", "cpu") + compute_type = whisper_settings.get("compute_type", "int8") + + # fast_whisper-specific optimizations + model = WhisperModel( + model_size, + device=device, + compute_type=compute_type, + cpu_threads=max(1, os.cpu_count() // 2), # Prevent CPU oversubscription + num_workers=1 # Optimal for most transcription workloads + ) + + # Atomic cache update + WHISPER_MODELS_CACHE[model_size] = model + logger.info(f"Model '{model_size}' loaded (device={device}, compute={compute_type})") + return model + + except Exception as e: + logger.error(f"Model '{model_size}' failed to load: {str(e)}", exc_info=True) + raise RuntimeError(f"Whisper model initialization failed: {e}") from e + +def _get_or_create_model_lock(model_size: str) -> threading.Lock: + """Thread-safe lock acquisition with minimal global contention""" + # Fast path: lock already exists + if model_size in _model_locks: + return _model_locks[model_size] + + # Slow path: create lock under global lock + with _global_lock: + return _model_locks.setdefault(model_size, threading.Lock()) def get_piper_voice(model_name: str, tts_settings: dict | None) -> "PiperVoice": """ @@ -900,14 +961,180 @@ def list_kokoro_languages_cli(timeout: int = 60) -> List[str]: return [] -def run_command(argv: TypingList[str], timeout: int = 300): +def run_command( + argv: List[str], + timeout: int = 300, + max_output_size: int = 5 * 1024 * 1024 # 5MB +) -> subprocess.CompletedProcess: + """ + Drop-in replacement for your run_command. + - Incrementally reads stdout/stderr in separate threads to avoid unbounded memory growth. + - Keeps at most `max_output_size` characters per stream (first N chars). + - Enforces a timeout (graceful terminate then kill). + - Uses optional preexec function `_limit_resources_preexec` if present in globals. + - Raises Exception on non-zero exit or timeout; returns CompletedProcess on success. + """ + logger.debug("Executing command: %s with timeout=%ss", " ".join(argv), timeout) + + # quick sanity: ensure there's a program to execute (improves error clarity) try: - res = subprocess.run(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout, preexec_fn=_limit_resources_preexec) - if res.returncode != 0: - raise Exception(f"Command failed with exit code {res.returncode}. Stderr: {res.stderr[:1000]}") - return res - except subprocess.TimeoutExpired: - raise Exception(f"Command timed out after {timeout}s") + exe = argv[0] + except Exception: + raise Exception("Invalid argv passed to run_command") + + preexec = globals().get("_limit_resources_preexec", None) + + # Buffers and state for threads + stdout_chunks = [] + stderr_chunks = [] + stdout_len = 0 + stderr_len = 0 + stdout_lock = threading.Lock() + stderr_lock = threading.Lock() + stdout_truncated = False + stderr_truncated = False + + def _reader(stream, chunks, lock, name): + nonlocal stdout_len, stderr_len, stdout_truncated, stderr_truncated + try: + while True: + data = stream.read(4096) + if not data: + break + with lock: + # choose which counters to use by stream identity + if name == "stdout": + if stdout_len < max_output_size: + # append as much as fits + remaining = max_output_size - stdout_len + to_append = data[:remaining] + chunks.append(to_append) + stdout_len += len(to_append) + if len(data) > remaining: + stdout_truncated = True + else: + stdout_truncated = True + else: + if stderr_len < max_output_size: + remaining = max_output_size - stderr_len + to_append = data[:remaining] + chunks.append(to_append) + stderr_len += len(to_append) + if len(data) > remaining: + stderr_truncated = True + else: + stderr_truncated = True + except Exception: + logger.exception("Reader thread for %s failed", name) + finally: + try: + stream.close() + except Exception: + pass + + # Start process + try: + proc = subprocess.Popen( + argv, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=preexec + ) + except FileNotFoundError: + msg = f"Command not found: {argv[0]}" + logger.error(msg) + raise Exception(msg) + except Exception as e: + msg = f"Unexpected error launching command: {e}" + logger.exception(msg) + raise Exception(msg) + + # Start reader threads + t_stdout = threading.Thread(target=_reader, args=(proc.stdout, stdout_chunks, stdout_lock, "stdout"), daemon=True) + t_stderr = threading.Thread(target=_reader, args=(proc.stderr, stderr_chunks, stderr_lock, "stderr"), daemon=True) + t_stdout.start() + t_stderr.start() + + # Wait loop with timeout + start = time.monotonic() + try: + while True: + ret = proc.poll() + if ret is not None: + break + elapsed = time.monotonic() - start + if timeout and elapsed > timeout: + # Timeout -> try terminate -> kill if needed + logger.error("Command timed out after %ss: %s", timeout, " ".join(argv)) + try: + proc.terminate() + # give it a short while to exit + waited = 0.0 + while proc.poll() is None and waited < 2.0: + time.sleep(0.1) + waited += 0.1 + except Exception: + logger.exception("Failed to terminate process on timeout; attempting kill.") + if proc.poll() is None: + try: + proc.kill() + except Exception: + logger.exception("Failed to kill process after timeout.") + # ensure threads finish reading leftover data + try: + t_stdout.join(timeout=1.0) + t_stderr.join(timeout=1.0) + except Exception: + pass + raise Exception(f"Command timed out after {timeout}s: {' '.join(argv)}") + + # sleep a little to avoid busy loop + time.sleep(0.1) + + # process finished normally; allow readers to finish + t_stdout.join(timeout=2.0) + t_stderr.join(timeout=2.0) + + # build strings from chunks + with stdout_lock: + stdout_str = "".join(stdout_chunks) if stdout_chunks else "" + if stdout_truncated: + truncated_amount = " (truncated to max_output_size)" + stdout_str += f"\n[TRUNCATED - output larger than {max_output_size} bytes]{truncated_amount}" + with stderr_lock: + stderr_str = "".join(stderr_chunks) if stderr_chunks else "" + if stderr_truncated: + truncated_amount = " (truncated to max_output_size)" + stderr_str += f"\n[TRUNCATED - output larger than {max_output_size} bytes]{truncated_amount}" + + # Check return code + rc = proc.returncode + if rc != 0: + # include limited stderr snippet for diagnostics (like your original) + snippet = (stderr_str or "")[:1000] + msg = f"Command failed with exit code {rc}. Stderr: {snippet}" + logger.error(msg) + raise Exception(msg) + + logger.debug("Command completed successfully: %s", " ".join(argv)) + return subprocess.CompletedProcess(args=argv, returncode=rc, stdout=stdout_str, stderr=stderr_str) + + finally: + # ensure no resource leaks + try: + if proc.stdout: + try: + proc.stdout.close() + except Exception: + pass + if proc.stderr: + try: + proc.stderr.close() + except Exception: + pass + except Exception: + pass def validate_and_build_command(template_str: str, mapping: Dict[str, str]) -> TypingList[str]: fmt = Formatter() @@ -928,48 +1155,113 @@ def validate_and_build_command(template_str: str, mapping: Dict[str, str]) -> Ty return shlex.split(formatted) # --- TASK RUNNERS --- -# Each task now accepts app_config and base_url to facilitate webhook notifications @huey.task() def run_transcription_task(job_id: str, input_path_str: str, output_path_str: str, model_size: str, whisper_settings: dict, app_config: dict, base_url: str): db = SessionLocal() input_path = Path(input_path_str) + output_path = Path(output_path_str) + + # --- Constants --- + # Time in seconds between database checks for progress updates and cancellations. + # This avoids hammering the database on every single segment. + DB_POLL_INTERVAL_SECONDS = 5 + try: job = get_job(db, job_id) - if not job or job.status == 'cancelled': return - update_job_status(db, job_id, "processing") + if not job: + logger.warning(f"Job {job_id} not found. Aborting task.") + return + if job.status == 'cancelled': + logger.info(f"Job {job_id} was already cancelled before starting. Aborting.") + return + + update_job_status(db, job_id, "processing", progress=0) + model = get_whisper_model(model_size, whisper_settings) - logger.info(f"Starting transcription for job {job_id}") - segments, info = model.transcribe(str(input_path), beam_size=5) - full_transcript = [] - for segment in segments: - job_check = get_job(db, job_id) - if job_check.status == 'cancelled': - logger.info(f"Job {job_id} cancelled during transcription.") - return - if info.duration > 0: - progress = int((segment.end / info.duration) * 100) - update_job_status(db, job_id, "processing", progress=progress) - full_transcript.append(segment.text.strip()) - transcript_text = "\n".join(full_transcript) - out_path = Path(output_path_str) - tmp_out = out_path.with_name(f"{out_path.stem}.tmp-{uuid.uuid4().hex}{out_path.suffix}") - with tmp_out.open("w", encoding="utf-8") as f: - f.write(transcript_text) - tmp_out.replace(out_path) - mark_job_as_completed(db, job_id, output_filepath_str=output_path_str, preview=transcript_text) - logger.info(f"Transcription for job {job_id} completed.") + logger.info(f"Starting transcription for job {job_id} with model '{model_size}'") + segments_generator, info = model.transcribe(str(input_path), beam_size=5) + logger.info(f"Detected language: {info.language} with probability {info.language_probability:.2f} for a duration of {info.duration:.2f}s") + + + last_update_time = time.time() + + # Use a temporary file to ensure atomic writes. The final file will only appear + # once the transcription is fully and successfully written. + tmp_output_path = output_path.with_name(f"{output_path.stem}.tmp-{uuid.uuid4().hex}{output_path.suffix}") + + # Store a small preview in memory for the final update without holding the whole transcript. + preview_segments = [] + PREVIEW_MAX_LENGTH = 1000 # characters + current_preview_length = 0 + + with tmp_output_path.open("w", encoding="utf-8") as f: + for segment in segments_generator: + segment_text = segment.text.strip() + f.write(segment_text + "\n") + + # Build a small preview + if current_preview_length < PREVIEW_MAX_LENGTH: + preview_segments.append(segment_text) + current_preview_length += len(segment_text) + + + current_time = time.time() + if current_time - last_update_time > DB_POLL_INTERVAL_SECONDS: + last_update_time = current_time + + # Check for cancellation without overwhelming the DB + job_check = get_job(db, job_id) + if job_check and job_check.status == 'cancelled': + logger.info(f"Job {job_id} cancelled during transcription. Stopping.") + # The temporary file will be cleaned up in the finally block + return + + # Update progress + if info.duration > 0: + progress = int((segment.end / info.duration) * 100) + update_job_status(db, job_id, "processing", progress=progress) + + tmp_output_path.replace(output_path) + + transcript_preview = " ".join(preview_segments) + if len(transcript_preview) > PREVIEW_MAX_LENGTH: + transcript_preview = transcript_preview[:PREVIEW_MAX_LENGTH] + "..." + + mark_job_as_completed(db, job_id, output_filepath_str=output_path_str, preview=transcript_preview) + logger.info(f"Transcription for job {job_id} completed successfully.") + except Exception as e: - logger.exception(f"ERROR during transcription for job {job_id}") - update_job_status(db, job_id, "failed", error=f"Transcription failed: {e}") + logger.exception(f"An unexpected error occurred during transcription for job {job_id}") + update_job_status(db, job_id, "failed", error=str(e)) + finally: + # This block executes whether the task succeeded, failed, or was cancelled and returned. + logger.debug(f"Performing cleanup for job {job_id}") + + # Clean up the temporary file if it still exists (e.g., due to cancellation) + if 'tmp_output_path' in locals() and tmp_output_path.exists(): + try: + tmp_output_path.unlink() + logger.debug(f"Removed temporary file: {tmp_output_path}") + except OSError as e: + logger.error(f"Error removing temporary file {tmp_output_path}: {e}") + + # Clean up the original input file try: + # First, ensure we are not deleting from an unexpected directory ensure_path_is_safe(input_path, [PATHS.UPLOADS_DIR, PATHS.CHUNK_TMP_DIR]) input_path.unlink(missing_ok=True) - except Exception: - logger.exception("Failed to cleanup input file after transcription.") - db.close() + logger.debug(f"Removed input file: {input_path}") + except Exception as e: + logger.exception(f"Failed to cleanup input file {input_path} for job {job_id}: {e}") + + if db: + db.close() + + # Send notification last, after all state has been finalized. send_webhook_notification(job_id, app_config, base_url) + @huey.task() def run_tts_task(job_id: str, input_path_str: str, output_path_str: str, model_name: str, tts_settings: dict, app_config: dict, base_url: str): db = SessionLocal() @@ -1069,8 +1361,7 @@ def run_pdf_ocr_task(job_id: str, input_path_str: str, output_path_str: str, ocr force_ocr=ocr_settings.get('force_ocr', True), clean=ocr_settings.get('clean', True), optimize=ocr_settings.get('optimize', 1), - progress_bar=False, - image_dpi=ocr_settings.get('image_dpi', 300)) + progress_bar=False) with open(output_path_str, "rb") as f: reader = pypdf.PdfReader(f) preview = "\n".join(page.extract_text() or "" for page in reader.pages) @@ -1088,148 +1379,313 @@ def run_pdf_ocr_task(job_id: str, input_path_str: str, output_path_str: str, ocr db.close() send_webhook_notification(job_id, app_config, base_url) -def image_adjustment_controller(img, brightness=128, - contrast=200): - - brightness = int((brightness - 0) * (255 - (-255)) / (510 - 0) + (-255)) - contrast = int((contrast - 0) * (127 - (-127)) / (254 - 0) + (-127)) - if brightness != 0: - if brightness > 0: - shadow = brightness - max = 255 - else: - shadow = 0 - max = 255 + brightness - - al_pha = (max - shadow) / 255 - ga_mma = shadow - # The function addWeighted calculates - # the weighted sum of two arrays - cal = cv2.addWeighted(img, al_pha, - img, 0, ga_mma) - else: - cal = img - if contrast != 0: - Alpha = float(131 * (contrast + 127)) / (127 * (131 - contrast)) - Gamma = 127 * (1 - Alpha) - # The function addWeighted calculates - # the weighted sum of two arrays - cal = cv2.addWeighted(cal, Alpha, - cal, 0, Gamma) - return cal - - -def preprocess_for_ocr(image_path: str) -> np.ndarray: - """Loads an image and applies preprocessing steps for OCR.""" - # Read the image using OpenCV - image = cv2.imread(image_path) - - # 1. Convert to grayscale - gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) - contrast_img = image_adjustment_controller(gray, brightness=150, contrast=120) - # 2. Binarize the image (Otsu's thresholding is great for this) - # This turns the image into pure black and white - _, binary_image = cv2.threshold(contrast_img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) - - # 3. Denoise the image (optional but often helpful) - denoised_image = cv2.medianBlur(gray, 3) - - return binary_image - - @huey.task() def run_image_ocr_task(job_id: str, input_path_str: str, output_path_str: str, app_config: dict, base_url: str): - """ - Performs OCR on an image file, first applying preprocessing steps to clean - the image, and then saving the output as a searchable PDF. - """ db = SessionLocal() input_path = Path(input_path_str) + out_path = Path(output_path_str) + try: job = get_job(db, job_id) - if not job or job.status == 'cancelled': - logger.warning(f"OCR job {job_id} was cancelled or not found. Aborting task.") + if not job or job.status == "cancelled": return - - update_job_status(db, job_id, "processing") - logger.info(f"Starting Image to PDF OCR for job {job_id}") - # Apply the preprocessing steps to the input image for better accuracy - logger.info(f"Preprocessing image for job {job_id}...") - preprocessed_image = preprocess_for_ocr(input_path_str) + update_job_status(db, job_id, "processing", progress=10) + logger.info(f"Starting Image OCR for job {job_id} - {input_path}") - # Configure Tesseract for optimal performance. - # '--psm 3' enables automatic page segmentation, which is a robust default. - # '-l eng' specifies English as the language. This should be made dynamic if you support others. - tesseract_config = '--psm 3' - logger.info(f"Running Tesseract with config: '{tesseract_config}' for job {job_id}") - - # Generate a searchable PDF from the preprocessed image data - pdf_bytes = pytesseract.image_to_pdf_or_hocr( - Image.fromarray(preprocessed_image), # Convert numpy array back to PIL Image - extension='pdf', - config=tesseract_config - ) - with open(output_path_str, "wb") as f: - f.write(pdf_bytes) - - # Generate a plain text preview from the same preprocessed image - preview_text = pytesseract.image_to_string( - Image.fromarray(preprocessed_image), - config=tesseract_config - ) - - mark_job_as_completed(db, job_id, output_filepath_str=output_path_str, preview=preview_text) - logger.info(f"Image to PDF OCR for job {job_id} completed successfully.") - - except Exception as e: - logger.exception(f"ERROR during Image to PDF OCR for job {job_id}") - update_job_status(db, job_id, "failed", error=f"Image OCR failed: {e}") - - finally: + # open image and gather frames (support multi-frame TIFF) + try: + pil_img = Image.open(str(input_path)) + except UnidentifiedImageError as e: + raise RuntimeError(f"Cannot identify/open input image: {e}") + + frames = [] + try: + # some images support n_frames (multi-page TIFF); iterate safely + n_frames = getattr(pil_img, "n_frames", 1) + for i in range(n_frames): + pil_img.seek(i) + # copy the frame to avoid problems when the original image object is closed + frames.append(pil_img.convert("RGB").copy()) + except Exception: + # fallback: single frame + frames = [pil_img.convert("RGB")] + + update_job_status(db, job_id, "processing", progress=30) + + pdf_bytes_list = [] + text_parts = [] + for idx, frame in enumerate(frames): + # produce searchable PDF bytes for the frame and plain text as well + try: + pdf_bytes = pytesseract.image_to_pdf_or_hocr(frame, extension="pdf") + except TesseractNotFoundError as e: + raise RuntimeError("Tesseract not found. Ensure Tesseract OCR is installed and in PATH.") from e + except Exception as e: + raise RuntimeError(f"Failed to run Tesseract on frame {idx}: {e}") from e + + pdf_bytes_list.append(pdf_bytes) + + # also extract plain text for preview and possible fallback + try: + page_text = pytesseract.image_to_string(frame) + except Exception: + page_text = "" + text_parts.append(page_text) + + # update progress incrementally + prog = 30 + int((idx + 1) / max(1, len(frames)) * 50) + update_job_status(db, job_id, "processing", progress=min(prog, 80)) + + # merge per-page pdfs if multiple frames + final_pdf_bytes = None + if len(pdf_bytes_list) == 1: + final_pdf_bytes = pdf_bytes_list[0] + else: + if _HAS_PYPDF2: + merger = PdfMerger() + for b in pdf_bytes_list: + merger.append(io.BytesIO(b)) + out_buffer = io.BytesIO() + merger.write(out_buffer) + merger.close() + final_pdf_bytes = out_buffer.getvalue() + else: + # PyPDF2 not installed — try a simple concatenation (not valid PDF merge), + # better to fail loudly so user can install PyPDF2; but as a fallback + # write the first page only and include a warning in job preview. + logger.warning("PyPDF2 not available; only the first frame will be written to output PDF.") + final_pdf_bytes = pdf_bytes_list[0] + text_parts.insert(0, "[WARNING] Multiple frames detected but PyPDF2 not available; only first page saved.\n") + + # write out atomically + tmp_out = out_path.with_name(f"{out_path.stem}.tmp-{uuid.uuid4().hex}{out_path.suffix or '.pdf'}") + try: + tmp_out.parent.mkdir(parents=True, exist_ok=True) + with tmp_out.open("wb") as f: + f.write(final_pdf_bytes) + tmp_out.replace(out_path) + except Exception as e: + raise RuntimeError(f"Failed writing output PDF to {out_path}: {e}") from e + + # create a preview from the recognized text (limit length) + full_text = "\n\n".join(text_parts).strip() + preview = full_text[:1000] + ("…" if len(full_text) > 1000 else "") + + mark_job_as_completed(db, job_id, output_filepath_str=str(out_path), preview=preview) + update_job_status(db, job_id, "completed", progress=100) + logger.info(f"Image OCR for job {job_id} completed. Output: {out_path}") + + except TesseractNotFoundError: + logger.exception(f"Tesseract not found for job {job_id}") + update_job_status(db, job_id, "failed", error="Image OCR failed: Tesseract not found on server.") + except Exception as e: + logger.exception(f"ERROR during Image OCR for job {job_id}: {e}") + update_job_status(db, job_id, "failed", error=f"Image OCR failed: {e}") + finally: + # cleanup input file (but only if it lives in allowed uploads dir) try: - # Clean up the original uploaded file ensure_path_is_safe(input_path, [PATHS.UPLOADS_DIR]) input_path.unlink(missing_ok=True) except Exception: - logger.exception(f"Failed to cleanup input file for job {job_id}.") - - db.close() - send_webhook_notification(job_id, app_config, base_url) + logger.exception("Failed to cleanup input file after Image OCR.") + try: + db.close() + except Exception: + logger.exception("Failed to close DB session after Image OCR.") + # send webhook regardless of success/failure (keeps original behavior) + try: + send_webhook_notification(job_id, app_config, base_url) + except Exception: + logger.exception("Failed to send webhook notification after Image OCR.") @huey.task() -def run_conversion_task(job_id: str, input_path_str: str, output_path_str: str, tool: str, task_key: str, conversion_tools_config: dict, app_config: dict, base_url: str): +def run_conversion_task(job_id: str, + input_path_str: str, + output_path_str: str, + tool: str, + task_key: str, + conversion_tools_config: dict, + app_config: dict, + base_url: str): + """ + Drop-in replacement for conversion task. + - Uses improved run_command for short operations (resource-limited). + - Uses cancellable Popen runner for long-running conversion to respond to DB cancellations. + """ db = SessionLocal() input_path = Path(input_path_str) output_path = Path(output_path_str) - temp_input_file = None - temp_output_file = None + + # localize helpers for speed + _get_job = get_job + _update_job_status = update_job_status + _validate_build = validate_and_build_command + _mark_completed = mark_job_as_completed + _ensure_safe = ensure_path_is_safe + _send_webhook = send_webhook_notification + + temp_input_file: Optional[Path] = None + temp_output_file: Optional[Path] = None + + POLL_INTERVAL = 1.0 + STDERR_SNIPPET = 4000 + + def _parse_task_key(tool_name: str, tk: str, tool_cfg: dict, mapping: dict): + try: + if tool_name.startswith("ghostscript"): + parts = tk.split("_", 1) + device = parts[0] if parts and parts[0] else "" + setting = parts[1] if len(parts) > 1 else "" + mapping.update({"device": device, "dpi": setting, "preset": setting}) + elif tool_name == "pngquant": + parts = tk.split("_", 1) + quality_key = parts[1] if len(parts) > 1 else (parts[0] if parts else "mq") + quality_map = {"hq": "80-95", "mq": "65-80", "fast": "65-80"} + speed_map = {"hq": "1", "mq": "3", "fast": "11"} + mapping.update({"quality": quality_map.get(quality_key, "65-80"), + "speed": speed_map.get(quality_key, "3")}) + elif tool_name == "sox": + parts = tk.split("_") + if len(parts) >= 3: + rate_token = parts[-2] + depth_token = parts[-1] + elif len(parts) == 2: + rate_token = parts[-1] + depth_token = "" + else: + rate_token = "" + depth_token = "" + rate_val = rate_token.replace("k", "000") if rate_token else "" + if depth_token: + depth_val = ('-b' + depth_token.replace('b', '')) if 'b' in depth_token else depth_token + else: + depth_val = '' + mapping.update({"samplerate": rate_val, "bitdepth": depth_val}) + elif tool_name == "mozjpeg": + parts = tk.split("_", 1) + quality_token = parts[1] if len(parts) > 1 else (parts[0] if parts else "") + quality = quality_token.replace("q", "") if quality_token else "" + mapping.update({"quality": quality}) + elif tool_name == "libreoffice": + target_ext = output_path.suffix.lstrip('.') + filter_val = tool_cfg.get("filters", {}).get(target_ext, target_ext) + mapping["filter"] = filter_val + except Exception: + logger.exception("Failed to parse task_key for tool %s; continuing with defaults.", tool_name) + + def _run_cancellable_command(command: List[str], timeout: int): + """ + Run command with Popen and poll the DB for cancellation. Enforce timeout. + Returns CompletedProcess-like on success. Raises Exception on failure/timeout/cancel. + """ + preexec = globals().get("_limit_resources_preexec", None) + logger.debug("Launching conversion subprocess: %s", " ".join(shlex.quote(c) for c in command)) + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, preexec_fn=preexec) + + start = time.monotonic() + stderr_accum = [] + stderr_len = 0 + STDERR_LIMIT = STDERR_SNIPPET + + try: + while True: + ret = proc.poll() + # Check job status + job_check = _get_job(db, job_id) + if job_check is None: + logger.warning("Job %s disappeared; killing conversion process.", job_id) + try: + proc.kill() + except Exception: + pass + raise Exception("Job disappeared during conversion") + if job_check.status == "cancelled": + logger.info("Job %s cancelled; terminating conversion process.", job_id) + try: + proc.kill() + except Exception: + pass + raise Exception("Conversion cancelled") + + if ret is not None: + # process done - read remaining stderr/stdout safely + try: + out, err = proc.communicate(timeout=2) + except Exception: + out, err = "", "" + try: + if proc.stderr: + err = proc.stderr.read(STDERR_LIMIT) + except Exception: + pass + if err and len(err) > STDERR_LIMIT: + err = err[-STDERR_LIMIT:] + if ret != 0: + msg = (err or "")[:STDERR_LIMIT] + raise Exception(f"Conversion command failed (rc={ret}): {msg}") + return subprocess.CompletedProcess(args=command, returncode=ret, stdout=out, stderr=err) + + # timeout check + elapsed = time.monotonic() - start + if timeout and elapsed > timeout: + logger.warning("Conversion command timed out after %ss; terminating.", timeout) + try: + proc.kill() + except Exception: + pass + raise Exception("Conversion command timed out") + + time.sleep(POLL_INTERVAL) + finally: + try: + if proc.stdout: + proc.stdout.close() + if proc.stderr: + proc.stderr.close() + except Exception: + pass + try: - job = get_job(db, job_id) - if not job or job.status == 'cancelled': + job = _get_job(db, job_id) + if not job: + logger.warning("Job %s not found; aborting conversion.", job_id) return - update_job_status(db, job_id, "processing", progress=25) - logger.info(f"Starting conversion for job {job_id} using {tool} with task {task_key}") + if job.status == "cancelled": + logger.info("Job %s already cancelled; aborting conversion.", job_id) + return + + _update_job_status(db, job_id, "processing", progress=25) + logger.info("Starting conversion for job %s using %s with task %s", job_id, tool, task_key) + tool_config = conversion_tools_config.get(tool) if not tool_config: raise ValueError(f"Unknown conversion tool: {tool}") current_input_path = input_path + # Pre-conversion step for mozjpeg uses improved run_command (resource-limited) if tool == "mozjpeg": temp_input_file = input_path.with_suffix('.temp.ppm') - logger.info(f"Pre-converting for MozJPEG: {input_path} -> {temp_input_file}") - pre_conv_cmd = ["vips", "copy", str(input_path), str(temp_input_file)] - pre_conv_result = subprocess.run(pre_conv_cmd, capture_output=True, text=True, check=False, timeout=tool_config.get("timeout", 300)) - if pre_conv_result.returncode != 0: - err = (pre_conv_result.stderr or "")[:4000] - raise Exception(f"MozJPEG pre-conversion to PPM failed: {err}") + logger.info("Pre-converting for MozJPEG: %s -> %s", input_path, temp_input_file) + vips_bin = shutil.which("vips") or "vips" + pre_conv_cmd = [vips_bin, "copy", str(input_path), str(temp_input_file)] + try: + run_command(pre_conv_cmd, timeout=int(tool_config.get("timeout", 300))) + except Exception as ex: + err_msg = str(ex) + short_err = (err_msg or "")[:STDERR_SNIPPET] + logger.exception("MozJPEG pre-conversion failed: %s", short_err) + raise Exception(f"MozJPEG pre-conversion to PPM failed: {short_err}") current_input_path = temp_input_file - update_job_status(db, job_id, "processing", progress=50) + _update_job_status(db, job_id, "processing", progress=50) + # Prepare atomic temp output on same FS + output_path.parent.mkdir(parents=True, exist_ok=True) temp_output_file = output_path.with_name(f"{output_path.stem}.tmp-{uuid.uuid4().hex}{output_path.suffix}") + mapping = { "input": str(current_input_path), "output": str(temp_output_file), @@ -1237,71 +1693,250 @@ def run_conversion_task(job_id: str, input_path_str: str, output_path_str: str, "output_ext": output_path.suffix.lstrip('.'), } - if tool.startswith("ghostscript"): - parts = task_key.split('_', 1) - device = parts[0] if parts else "" - setting = parts[1] if len(parts) > 1 else "" - mapping.update({"device": device, "dpi": setting, "preset": setting}) - elif tool == "pngquant": - _, quality_key = task_key.split('_') - quality_map = {"hq": "80-95", "mq": "65-80", "fast": "65-80"} - speed_map = {"hq": "1", "mq": "3", "fast": "11"} - mapping.update({"quality": quality_map.get(quality_key, "65-80"), "speed": speed_map.get(quality_key, "3")}) - elif tool == "sox": - rate, depth = '', '' - try: - _, rate, depth = task_key.split('_') - depth = ('-b' + depth.replace('b', '')) if 'b' in depth else '16b' - except: - _, rate = task_key.split('_') - depth = '' - rate = rate.replace('k', '000') if 'k' in rate else rate - mapping.update({"samplerate": rate, "bitdepth": depth}) - elif tool == "mozjpeg": - _, quality = task_key.split('_') - quality = quality.replace('q', '') - mapping.update({"quality": quality}) - elif tool == "libreoffice": - target_ext = output_path.suffix.lstrip('.') - filter_val = tool_config.get("filters", {}).get(target_ext, target_ext) - mapping["filter"] = filter_val + _parse_task_key(tool, task_key, tool_config, mapping) - command_template_str = tool_config["command_template"] - command = validate_and_build_command(command_template_str, mapping) - logger.info(f"Executing command: {' '.join(command)}") + command_template_str = tool_config.get("command_template") + if not command_template_str: + raise ValueError(f"Tool '{tool}' missing 'command_template' in configuration.") + command = _validate_build(command_template_str, mapping) + if not isinstance(command, (list, tuple)) or not command: + raise ValueError("validate_and_build_command must return a non-empty list/tuple command.") + command = [str(x) for x in command] - run_command(command, timeout=tool_config.get("timeout", 300)) + logger.info("Executing command: %s", " ".join(shlex.quote(c) for c in command)) + # Run main conversion in cancellable manner + timeout_val = int(tool_config.get("timeout", 300)) + + # call the cancellable runner above + result = _run_cancellable_command(command, timeout=timeout_val) if False else None + # the above is replaced with a direct call to the actual function: + result = _run_cancellable_command(command, timeout=timeout_val) + + # If successful and temp output exists, move it into place atomically if temp_output_file and temp_output_file.exists(): temp_output_file.replace(output_path) - mark_job_as_completed(db, job_id, output_filepath_str=str(output_path), preview=f"Successfully converted file.") - logger.info(f"Conversion for job {job_id} completed.") + _mark_completed(db, job_id, output_filepath_str=str(output_path), preview="Successfully converted file.") + logger.info("Conversion for job %s completed.", job_id) + except Exception as e: - logger.exception(f"ERROR during conversion for job {job_id}") - update_job_status(db, job_id, "failed", error=f"Conversion failed: {e}") - finally: + logger.exception("ERROR during conversion for job %s: %s", job_id, e) try: - ensure_path_is_safe(input_path, [PATHS.UPLOADS_DIR, PATHS.CHUNK_TMP_DIR]) + _update_job_status(db, job_id, "failed", error=f"Conversion failed: {e}") + except Exception: + logger.exception("Failed to update job status to failed after conversion error.") + finally: + # clean main input + try: + _ensure_safe(input_path, [PATHS.UPLOADS_DIR, PATHS.CHUNK_TMP_DIR]) input_path.unlink(missing_ok=True) except Exception: logger.exception("Failed to cleanup main input file after conversion.") + + # cleanup temp input if temp_input_file: try: temp_input_file_path = Path(temp_input_file) - ensure_path_is_safe(temp_input_file_path, [PATHS.UPLOADS_DIR, PATHS.PROCESSED_DIR]) + _ensure_safe(temp_input_file_path, [PATHS.UPLOADS_DIR, PATHS.PROCESSED_DIR]) temp_input_file_path.unlink(missing_ok=True) except Exception: logger.exception("Failed to cleanup temp input file after conversion.") + if temp_output_file: try: temp_output_file_path = Path(temp_output_file) - ensure_path_is_safe(temp_output_file_path, [PATHS.UPLOADS_DIR, PATHS.PROCESSED_DIR]) + _ensure_safe(temp_output_file_path, [PATHS.UPLOADS_DIR, PATHS.PROCESSED_DIR]) temp_output_file_path.unlink(missing_ok=True) except Exception: logger.exception("Failed to cleanup temp output file after conversion.") + + try: + db.close() + except Exception: + logger.exception("Failed to close DB session after conversion.") + + try: + gc.collect() + except Exception: + pass + + try: + _send_webhook(job_id, app_config, base_url) + except Exception: + logger.exception("Failed to send webhook notification after conversion.") + +def dispatch_single_file_job(original_filename: str, input_filepath: str, task_type: str, user: dict, db: Session, app_config: Dict, base_url: str, job_id: str | None = None, options: Dict = None, parent_job_id: str | None = None): + """Helper to create and dispatch a job for a single file.""" + if options is None: + options = {} + + # If no job_id is passed, generate one. This is for sub-tasks from zips. + if job_id is None: + job_id = uuid.uuid4().hex + + safe_filename = secure_filename(original_filename) + final_path = Path(input_filepath) + + # Ensure the input file exists before creating a job + if not final_path.exists(): + logger.error(f"Input file does not exist, cannot dispatch job: {input_filepath}") + return + + job_data = JobCreate( + id=job_id, user_id=user['sub'], task_type=task_type, + original_filename=original_filename, input_filepath=str(final_path), + input_filesize=final_path.stat().st_size, + parent_job_id=parent_job_id + ) + + if task_type == "transcription": + stem = Path(safe_filename).stem + processed_path = PATHS.PROCESSED_DIR / f"{stem}_{job_id}.txt" + job_data.processed_filepath = str(processed_path) + create_job(db=db, job=job_data) + run_transcription_task(job_data.id, str(final_path), str(processed_path), options.get("model_size", "base"), app_config.get("transcription_settings", {}).get("whisper", {}), app_config, base_url) + elif task_type == "tts": + tts_config = app_config.get("tts_settings", {}) + stem = Path(safe_filename).stem + processed_path = PATHS.PROCESSED_DIR / f"{stem}_{job_id}.wav" + job_data.processed_filepath = str(processed_path) + create_job(db=db, job=job_data) + run_tts_task(job_data.id, str(final_path), str(processed_path), options.get("model_name"), tts_config, app_config, base_url) + elif task_type == "ocr": + stem, suffix = Path(safe_filename).stem, Path(safe_filename).suffix.lower() + IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp', '.webp'} + if suffix not in IMAGE_EXTENSIONS and suffix != '.pdf': + logger.warning(f"Skipping unsupported file type for OCR: {original_filename}") + # Clean up the orphaned file from the zip extraction + final_path.unlink(missing_ok=True) + return + processed_path = PATHS.PROCESSED_DIR / f"{stem}_{job_id}.pdf" + job_data.processed_filepath = str(processed_path) + create_job(db=db, job=job_data) + if suffix in IMAGE_EXTENSIONS: + run_image_ocr_task(job_data.id, str(final_path), str(processed_path), app_config, base_url) + else: + run_pdf_ocr_task(job_data.id, str(final_path), str(processed_path), app_config.get("ocr_settings", {}).get("ocrmypdf", {}), app_config, base_url) + elif task_type == "conversion": + try: + tool, task_key = options.get("output_format").split('_', 1) + except (AttributeError, ValueError): + logger.error(f"Invalid or missing output_format for conversion of {original_filename}") + final_path.unlink(missing_ok=True) + return + original_stem = Path(safe_filename).stem + target_ext = task_key.split('_')[0] + if tool == "ghostscript_pdf": target_ext = "pdf" + processed_path = PATHS.PROCESSED_DIR / f"{original_stem}_{job_id}.{target_ext}" + job_data.processed_filepath = str(processed_path) + create_job(db=db, job=job_data) + run_conversion_task(job_data.id, str(final_path), str(processed_path), tool, task_key, app_config.get("conversion_tools", {}), app_config, base_url) + else: + logger.error(f"Invalid task type '{task_type}' for file {original_filename}") + final_path.unlink(missing_ok=True) + +@huey.task() +def unzip_and_dispatch_task(job_id: str, input_path_str: str, sub_task_type: str, sub_task_options: dict, user: dict, app_config: dict, base_url: str): + db = SessionLocal() + input_path = Path(input_path_str) + unzip_dir = PATHS.UPLOADS_DIR / f"unzipped_{job_id}" + try: + if not zipfile.is_zipfile(input_path): + raise ValueError("Uploaded file is not a valid ZIP archive.") + unzip_dir.mkdir() + with zipfile.ZipFile(input_path, 'r') as zip_ref: + zip_ref.extractall(unzip_dir) + + file_count = 0 + for extracted_file_path in unzip_dir.rglob('*'): + if extracted_file_path.is_file(): + file_count += 1 + dispatch_single_file_job( + original_filename=extracted_file_path.name, + input_filepath=str(extracted_file_path), + task_type=sub_task_type, + options=sub_task_options, + user=user, + db=db, + app_config=app_config, + base_url=base_url, + parent_job_id=job_id + ) + if file_count > 0: + # Mark parent job as processing, to be completed by the periodic task + update_job_status(db, job_id, "processing", progress=0) + else: + # No files found, mark as completed with a note + mark_job_as_completed(db, job_id, preview="ZIP archive was empty. No sub-jobs created.") + + except Exception as e: + logger.exception(f"ERROR during ZIP processing for job {job_id}") + update_job_status(db, job_id, "failed", error=f"Failed to process ZIP file: {e}") + # If unzipping fails, clean up the directory + if unzip_dir.exists(): + shutil.rmtree(unzip_dir) + finally: + try: + # CRITICAL FIX: Only delete the original ZIP file. + # Do NOT delete the unzip_dir here, as the sub-tasks need the files. + ensure_path_is_safe(input_path, [PATHS.UPLOADS_DIR, PATHS.CHUNK_TMP_DIR]) + input_path.unlink(missing_ok=True) + except Exception: + logger.exception("Failed to cleanup original ZIP file.") + db.close() + +@huey.periodic_task(crontab(minute='*/1')) # Runs every 1 minutes +def update_unzip_job_progress(): + """Periodically checks and updates the progress of parent 'unzip' jobs.""" + db = SessionLocal() + try: + # Find all 'unzip' jobs that are still marked as 'processing' + parent_jobs_to_check = db.query(Job).filter( + Job.task_type == 'unzip', + Job.status == 'processing' + ).all() + + if not parent_jobs_to_check: + return # Nothing to do + + logger.info(f"Checking progress for {len(parent_jobs_to_check)} active batch jobs.") + + for parent_job in parent_jobs_to_check: + # Find all children of this parent job + child_jobs = db.query(Job).filter(Job.parent_job_id == parent_job.id).all() + total_children = len(child_jobs) + + if total_children == 0: + # This case shouldn't happen if unzip_and_dispatch_task works, but as a safeguard: + mark_job_as_completed(db, parent_job.id, preview="Batch job completed with no sub-tasks.") + continue + + finished_children = 0 + for child in child_jobs: + if child.status in ['completed', 'failed', 'cancelled']: + finished_children += 1 + + # Calculate and update progress + progress = int((finished_children / total_children) * 100) if total_children > 0 else 100 + + if finished_children == total_children: + # All children are done, mark the parent as completed + failed_count = sum(1 for child in child_jobs if child.status == 'failed') + preview = f"Batch processing complete. {total_children - failed_count}/{total_children} tasks succeeded." + if failed_count > 0: + preview += f" ({failed_count} failed)." + mark_job_as_completed(db, parent_job.id, preview=preview) + logger.info(f"Batch job {parent_job.id} marked as completed.") + else: + # Update the progress if it has changed + if parent_job.progress != progress: + update_job_status(db, parent_job.id, 'processing', progress=progress) + + except Exception as e: + logger.exception(f"Error in periodic task update_unzip_job_progress: {e}") + finally: db.close() - send_webhook_notification(job_id, app_config, base_url) # -------------------------------------------------------------------------------- # --- 5. FASTAPI APPLICATION @@ -1541,75 +2176,33 @@ async def finalize_upload(request: Request, payload: FinalizeUploadPayload, user if payload.callback_url and not is_allowed_callback_url(payload.callback_url, webhook_config.get("allowed_callback_urls", [])): raise HTTPException(status_code=400, detail="Provided callback_url is not allowed.") - job_id = uuid.uuid4().hex safe_filename = secure_filename(payload.original_filename) final_path = PATHS.UPLOADS_DIR / f"{Path(safe_filename).stem}_{job_id}{Path(safe_filename).suffix}" await _stitch_chunks(temp_dir, final_path, payload.total_chunks) base_url = str(request.base_url) - job_data = JobCreate( - id=job_id, user_id=user['sub'], task_type=payload.task_type, - original_filename=payload.original_filename, input_filepath=str(final_path), - input_filesize=final_path.stat().st_size - ) - # --- Task Dispatching for UI chunked uploads --- - if payload.task_type == "transcription": - stem = Path(safe_filename).stem - processed_path = PATHS.PROCESSED_DIR / f"{stem}_{job_id}.txt" - job_data.processed_filepath = str(processed_path) + if Path(safe_filename).suffix.lower() == '.zip': + job_data = JobCreate( + id=job_id, user_id=user['sub'], task_type="unzip", + original_filename=payload.original_filename, input_filepath=str(final_path), + input_filesize=final_path.stat().st_size + ) create_job(db=db, job=job_data) - run_transcription_task(job_data.id, str(final_path), str(processed_path), payload.model_size, APP_CONFIG.get("transcription_settings", {}).get("whisper", {}), APP_CONFIG, base_url) - elif payload.task_type == "tts": - tts_config = APP_CONFIG.get("tts_settings", {}) - stem = Path(safe_filename).stem - processed_path = PATHS.PROCESSED_DIR / f"{stem}_{job_id}.wav" - job_data.processed_filepath = str(processed_path) - create_job(db=db, job=job_data) - run_tts_task(job_data.id, str(final_path), str(processed_path), payload.model_name, tts_config, APP_CONFIG, base_url) - elif payload.task_type == "ocr": - stem, suffix = Path(safe_filename).stem, Path(safe_filename).suffix.lower() - IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp', '.webp'} - - # 1. Validate file type before creating a job - if suffix not in IMAGE_EXTENSIONS and suffix != '.pdf': - final_path.unlink(missing_ok=True) # Clean up the uploaded file - raise HTTPException( - status_code=415, - detail=f"Unsupported file type for OCR: '{suffix}'. Please upload a PDF or a supported image." - ) - - # 2. Set output path to always be a PDF - processed_path = PATHS.PROCESSED_DIR / f"{stem}_{job_id}.pdf" - job_data.processed_filepath = str(processed_path) - create_job(db=db, job=job_data) - - # 3. Dispatch to the correct task based on file type - if suffix in IMAGE_EXTENSIONS: - # Call the existing image task, which is now modified to produce a PDF - run_image_ocr_task(job_data.id, str(final_path), str(processed_path), APP_CONFIG, base_url) - else: # It must be a .pdf due to the earlier check - run_pdf_ocr_task(job_data.id, str(final_path), str(processed_path), APP_CONFIG.get("ocr_settings", {}).get("ocrmypdf", {}), APP_CONFIG, base_url) - elif payload.task_type == "conversion": - try: - tool, task_key = payload.output_format.split('_', 1) - except Exception: - final_path.unlink(missing_ok=True) - raise HTTPException(status_code=400, detail="Invalid output_format for conversion.") - original_stem = Path(safe_filename).stem - target_ext = task_key.split('_')[0] - if tool == "ghostscript_pdf": target_ext = "pdf" - processed_path = PATHS.PROCESSED_DIR / f"{original_stem}_{job_id}.{target_ext}" - job_data.processed_filepath = str(processed_path) - create_job(db=db, job=job_data) - run_conversion_task(job_data.id, str(final_path), str(processed_path), tool, task_key, APP_CONFIG.get("conversion_tools", {}), APP_CONFIG, base_url) + sub_task_options = { + "model_size": payload.model_size, + "model_name": payload.model_name, + "output_format": payload.output_format + } + unzip_and_dispatch_task(job_id, str(final_path), payload.task_type, sub_task_options, user, APP_CONFIG, base_url) else: - final_path.unlink(missing_ok=True) - raise HTTPException(status_code=400, detail="Invalid task type.") + options = {"model_size": payload.model_size, "model_name": payload.model_name, "output_format": payload.output_format} + dispatch_single_file_job(payload.original_filename, str(final_path), payload.task_type, user, db, APP_CONFIG, base_url, job_id=job_id, options=options) return {"job_id": job_id, "status": "pending"} + # --- LEGACY DIRECT-UPLOAD ROUTES (kept for compatibility) --- @app.post("/transcribe-audio", status_code=status.HTTP_202_ACCEPTED) async def submit_audio_transcription( @@ -1691,7 +2284,7 @@ async def submit_image_ocr(request: Request, file: UploadFile = File(...), db: S file_ext = Path(safe_basename).suffix unique_filename = f"{Path(safe_basename).stem}_{job_id}{file_ext}" upload_path = PATHS.UPLOADS_DIR / unique_filename - processed_path = PATHS.PROCESSED_DIR / f"{Path(safe_basename).stem}_{job_id}.txt" + processed_path = PATHS.PROCESSED_DIR / f"{Path(safe_basename).stem}_{job_id}.pdf" input_size = await save_upload_file(file, upload_path) base_url = str(request.base_url) @@ -2092,6 +2685,64 @@ async def download_file(filename: str, db: Session = Depends(get_db), user: dict download_filename = Path(job.original_filename).stem + Path(job.processed_filepath).suffix return FileResponse(path=file_path, filename=download_filename, media_type="application/octet-stream") +@app.post("/download/batch", response_class=StreamingResponse) +async def download_batch(payload: JobSelection, db: Session = Depends(get_db), user: dict = Depends(require_user)): + job_ids = payload.job_ids + if not job_ids: + raise HTTPException(status_code=400, detail="No job IDs provided.") + + zip_buffer = BytesIO() + with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: + for job_id in job_ids: + job = get_job(db, job_id) + if job and job.user_id == user['sub'] and job.status == 'completed' and job.processed_filepath: + file_path = ensure_path_is_safe(Path(job.processed_filepath), [PATHS.PROCESSED_DIR]) + if file_path.exists(): + download_filename = f"{Path(job.original_filename).stem}_{job_id}{file_path.suffix}" + zip_file.write(file_path, arcname=download_filename) + + zip_buffer.seek(0) + return StreamingResponse(zip_buffer, media_type="application/x-zip-compressed", headers={ + 'Content-Disposition': f'attachment; filename="file-wizard-batch-{uuid.uuid4().hex[:8]}.zip"' + }) + +@app.get("/download/zip-batch/{job_id}", response_class=StreamingResponse) +async def download_zip_batch(job_id: str, db: Session = Depends(get_db), user: dict = Depends(require_user)): + """Downloads all processed files from a ZIP upload batch as a new ZIP file.""" + parent_job = get_job(db, job_id) + if not parent_job or parent_job.user_id != user['sub']: + raise HTTPException(status_code=404, detail="Parent job not found.") + if parent_job.task_type != 'unzip': + raise HTTPException(status_code=400, detail="This job is not a batch upload.") + + child_jobs = db.query(Job).filter(Job.parent_job_id == job_id, Job.status == 'completed').all() + if not child_jobs: + raise HTTPException(status_code=404, detail="No completed sub-jobs found for this batch.") + + zip_buffer = BytesIO() + with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: + files_added = 0 + for job in child_jobs: + if job.processed_filepath: + file_path = ensure_path_is_safe(Path(job.processed_filepath), [PATHS.PROCESSED_DIR]) + if file_path.exists(): + # Create a more user-friendly name inside the zip + download_filename = f"{Path(job.original_filename).stem}{file_path.suffix}" + zip_file.write(file_path, arcname=download_filename) + files_added += 1 + + if files_added == 0: + raise HTTPException(status_code=404, detail="No processed files found for the completed sub-jobs.") + + zip_buffer.seek(0) + + # Generate a filename for the download + batch_filename = f"{Path(parent_job.original_filename).stem}_processed.zip" + + return StreamingResponse(zip_buffer, media_type="application/x-zip-compressed", headers={ + 'Content-Disposition': f'attachment; filename="{batch_filename}"' + }) + @app.get("/health") async def health(): try: @@ -2104,4 +2755,4 @@ async def health(): @app.get('/favicon.ico', include_in_schema=False) async def favicon(): - return FileResponse(str(PATHS.BASE_DIR / 'static' / 'favicon.png')) + return FileResponse(str(PATHS.BASE_DIR / 'static' / 'favicon.png')) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d46d3c7..10744f8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,6 +31,7 @@ python-multipart markitdown[pdf,docx,pptx,xlsx,outlook]==0.1.3 PyPDF2==3.0.1 mechanize +html5_parser css_parser #PyQt6==6.9.1 # uncomment for calibre, it uses the webengine #PyQt6-Qt6==6.9.2 diff --git a/static/css/style.css b/static/css/style.css index 8c0466e..d2884c4 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -93,7 +93,7 @@ body { /* Container */ .container { width: 100%; - max-width: 1280px; + max-width: 92vw; margin: 0 auto; background: var(--card-bg); border-radius: 10px; @@ -329,6 +329,15 @@ input[type="file"] { text-align: center; color: var(--muted-text); font-size: 1.25rem; + + margin: 0; + text-align: left; +} + +.history-header { + display: flex; + justify-content: space-between; + align-items: center; margin-bottom: 1rem; border-top: 1px solid var(--divider-color); padding-top: 2rem; @@ -370,18 +379,29 @@ input[type="file"] { } .cell-value { - max-width: 10em; - text-wrap: wrap; - overflow: scroll; + display: flex; + align-items: center; + max-width: 20em; + overflow: hidden; + text-overflow: ellipsis; +} + +.status-cell-value { + flex-direction: column; + align-items: flex-start; + gap: 4px; +} + +.file-cell-content { + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; } #job-table td[data-label="File"], #job-table td[data-label="Task"] { - overflow: scroll; text-overflow: ellipsis; - text-wrap: wrap; - max-width: 15em; } .action-col { @@ -439,6 +459,12 @@ input[type="file"] { transition: width 0.5s ease-in-out; } + +.select-col { + width: 20px; + text-align: center; +} + .progress-bar.indeterminate { width: 100%; background: linear-gradient( @@ -468,6 +494,12 @@ input[type="file"] { border: none; cursor: pointer; } + +#download-selected-btn { + width: auto; + white-space: nowrap; + } + .download-button { background-color: var(--success-color); color: #00160b; @@ -703,23 +735,53 @@ body.dragging .drag-overlay { text-align: right; min-width: 0; word-break: break-all; - overflow: scroll; - max-width: 100em; } - .cell-value { + .cell-value, .file-cell-content { min-width: 0; - max-width: 20em; - text-wrap: nowrap; - overflow: scroll; - } - - - #job-table td[data-label="File"], - #job-table td[data-label="Task"] { - overflow: scroll; - text-overflow: ellipsis; - text-wrap: nowrap; - max-width: 100em; + max-width: 50vw; /* Adjust for smaller screens */ } } + +/* --- Collapsible Job Rows --- */ +tr.parent-job { + cursor: pointer; +} + +.expander-arrow { + content: ''; + display: inline-block; + margin-right: 0.7em; + transition: transform 0.2s ease-in-out; + width: 0.7em; + height: 0.7em; + background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 16 16'%3e%3cpath fill='none' stroke='%239aa4ad' stroke-linecap='round' stroke-linejoin='round' stroke-width='2' d='M2 5l6 6 6-6'/%3e%3c/svg%3e"); + background-repeat: no-repeat; + background-position: center; + background-size: contain; + transform: rotate(-90deg); + flex-shrink: 0; +} + +tr.parent-job.sub-jobs-visible .expander-arrow { + transform: rotate(0deg); +} + +tr.sub-job { + display: none; /* Initially hidden */ + background-color: rgba(255, 255, 255, 0.03); +} + +tr.sub-job.is-visible { + display: table-row; /* Show on desktop */ +} + +tr.sub-job td[data-label="File"] .cell-value { + padding-left: 1.7em; +} + +@media (max-width: 768px) { + tr.sub-job.is-visible { + display: block; /* Show on mobile */ + } +} \ No newline at end of file diff --git a/static/favicon.png b/static/favicon.png index 6613b78..aacf33f 100644 Binary files a/static/favicon.png and b/static/favicon.png differ diff --git a/static/js/script.js b/static/js/script.js index 7b8da3a..06ac803 100644 --- a/static/js/script.js +++ b/static/js/script.js @@ -42,6 +42,8 @@ document.addEventListener('DOMContentLoaded', () => { const startTranscriptionBtn = document.getElementById('start-transcription-btn'); const startTtsBtn = document.getElementById('start-tts-btn'); + const downloadSelectedBtn = document.getElementById('download-selected-btn'); + const selectAllJobsCheckbox = document.getElementById('select-all-jobs'); const jobListBody = document.getElementById('job-list-body'); // Drag and Drop Elements @@ -495,27 +497,100 @@ document.addEventListener('DOMContentLoaded', () => { } } + function handleSelectionChange() { + const selectedCheckboxes = jobListBody.querySelectorAll('.job-checkbox:checked'); + downloadSelectedBtn.disabled = selectedCheckboxes.length === 0; + + const allCheckboxes = jobListBody.querySelectorAll('.job-checkbox'); + selectAllJobsCheckbox.checked = allCheckboxes.length > 0 && selectedCheckboxes.length === allCheckboxes.length; + } + + async function handleBatchDownload() { + const selectedIds = Array.from(jobListBody.querySelectorAll('.job-checkbox:checked')).map(cb => cb.value); + if (selectedIds.length === 0) return; + + downloadSelectedBtn.disabled = true; + downloadSelectedBtn.textContent = 'Zipping...'; + + try { + const response = await authFetch('/download/batch', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ job_ids: selectedIds }) + }); + if (!response.ok) throw new Error('Batch download failed.'); + + const blob = await response.blob(); + const url = window.URL.createObjectURL(blob); + const a = document.createElement('a'); + a.style.display = 'none'; + a.href = url; + a.download = `file-wizard-batch-${Date.now()}.zip`; + document.body.appendChild(a); + a.click(); + window.URL.revokeObjectURL(url); + } catch (error) { + console.error("Batch download error:", error); + alert("Could not download files. Please try again."); + } finally { + downloadSelectedBtn.disabled = false; + downloadSelectedBtn.textContent = 'Download Selected as ZIP'; + } + } + async function loadInitialJobs() { try { const response = await authFetch('/jobs'); if (!response.ok) throw new Error('Failed to fetch jobs.'); - const jobs = await response.json(); + let jobs = await response.json(); + + // Sort jobs so parents come before children + jobs.sort((a, b) => { + if (a.id === b.parent_job_id) return -1; + if (b.id === a.parent_job_id) return 1; + return new Date(b.created_at) - new Date(a.created_at); + }); + jobListBody.innerHTML = ''; for (const job of jobs.reverse()) { renderJobRow(job); if (['pending', 'processing'].includes(job.status)) startPolling(job.id); } + handleSelectionChange(); } catch (error) { console.error("Couldn't load job history:", error); if (error.message !== 'Session expired') { - jobListBody.innerHTML = '