import requests import sqlite3 import time import subprocess import hashlib import os import sys import logging from datetime import datetime, timedelta from pathlib import Path from functools import wraps # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Load configuration from environment variables NOCODB_URL = os.getenv("NOCODB_URL") NOCODB_TOKEN = os.getenv("NOCODB_TOKEN") RTMP_SERVER = os.getenv("RTMP_SERVER") RAW_DIR = Path(os.getenv("RAW_DIR", "/root/surowe_filmy")) FINAL_DIR = Path(os.getenv("FINAL_DIR", "/root/przygotowane_filmy")) WHISPER_MODEL = os.getenv("WHISPER_MODEL", "/root/models/ggml-base.bin") VAAPI_DEVICE = os.getenv("VAAPI_DEVICE", "/dev/dri/renderD128") STREAM_GRACE_PERIOD_MINUTES = int(os.getenv("STREAM_GRACE_PERIOD_MINUTES", "15")) # Timing configuration NOCODB_SYNC_INTERVAL_SECONDS = int(os.getenv("NOCODB_SYNC_INTERVAL_SECONDS", "60")) WATCHDOG_CHECK_INTERVAL_SECONDS = int(os.getenv("WATCHDOG_CHECK_INTERVAL_SECONDS", "10")) # Configuration validation def validate_config(): """Validate required environment variables on startup.""" required_vars = { "NOCODB_URL": NOCODB_URL, "NOCODB_TOKEN": NOCODB_TOKEN, "RTMP_SERVER": RTMP_SERVER } missing = [name for name, value in required_vars.items() if not value] if missing: logger.error(f"Missing required environment variables: {', '.join(missing)}") logger.error("Please set the following environment variables:") logger.error(" NOCODB_URL - NocoDB API endpoint") logger.error(" NOCODB_TOKEN - Authentication token") logger.error(" RTMP_SERVER - Streaming destination") sys.exit(1) # Validate directories exist or can be created for dir_path in [RAW_DIR, FINAL_DIR]: try: dir_path.mkdir(parents=True, exist_ok=True) except Exception as e: logger.error(f"Cannot create directory {dir_path}: {e}") sys.exit(1) logger.info("Configuration validated successfully") logger.info(f"RAW_DIR: {RAW_DIR}") logger.info(f"FINAL_DIR: {FINAL_DIR}") logger.info(f"WHISPER_MODEL: {WHISPER_MODEL}") logger.info(f"VAAPI_DEVICE: {VAAPI_DEVICE}") logger.info(f"STREAM_GRACE_PERIOD: {STREAM_GRACE_PERIOD_MINUTES} minutes") logger.info(f"NOCODB_SYNC_INTERVAL: {NOCODB_SYNC_INTERVAL_SECONDS} seconds") logger.info(f"WATCHDOG_CHECK_INTERVAL: {WATCHDOG_CHECK_INTERVAL_SECONDS} seconds") # Database setup db = sqlite3.connect("scheduler.db", check_same_thread=False) db.row_factory = sqlite3.Row # Ensure table exists with log column and streaming metadata db.execute(""" CREATE TABLE IF NOT EXISTS jobs ( nocodb_id TEXT PRIMARY KEY, title TEXT, run_at TIMESTAMP, prep_at TIMESTAMP, raw_path TEXT, final_path TEXT, prep_status TEXT, play_status TEXT, log TEXT, stream_start_time TIMESTAMP, stream_retry_count INTEGER DEFAULT 0 ) """) db.commit() # Add new columns if they don't exist (for existing databases) try: db.execute("ALTER TABLE jobs ADD COLUMN stream_start_time TIMESTAMP") db.commit() except: pass try: db.execute("ALTER TABLE jobs ADD COLUMN stream_retry_count INTEGER DEFAULT 0") db.commit() except: pass # Track streaming process streaming_process = None current_streaming_job = None def log_to_db(nocodb_id, message): """Append a timestamped log message to the database log column.""" timestamp = datetime.now().isoformat() log_entry = f"[{timestamp}] {message}\n" try: db.execute(""" UPDATE jobs SET log = log || ? WHERE nocodb_id = ? """, (log_entry, nocodb_id)) db.commit() logger.info(f"Job {nocodb_id}: {message}") except Exception as e: logger.error(f"Failed to log to database: {e}") def retry_with_backoff(max_attempts=3, base_delay=1): """Decorator to retry a function with exponential backoff.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except Exception as e: if attempt == max_attempts: logger.error(f"{func.__name__} failed after {max_attempts} attempts: {e}") raise delay = base_delay * (2 ** (attempt - 1)) logger.warning(f"{func.__name__} attempt {attempt} failed: {e}. Retrying in {delay}s...") time.sleep(delay) return wrapper return decorator @retry_with_backoff(max_attempts=3) def sync(): """Sync jobs from NocoDB.""" try: response = requests.get( NOCODB_URL, headers={"xc-token": NOCODB_TOKEN}, timeout=30 ) response.raise_for_status() rows = response.json().get("list", []) logger.info(f"Fetched {len(rows)} jobs from NocoDB") for r in rows: try: run_at = datetime.fromisoformat(r["Date"]) prep_at = run_at - timedelta(hours=6) # Preserve existing status and streaming data on sync existing = db.execute("SELECT * FROM jobs WHERE nocodb_id=?", (r["Id"],)).fetchone() if existing: db.execute(""" UPDATE jobs SET title=?, run_at=?, prep_at=? WHERE nocodb_id=? """, (r["Title"], run_at, prep_at, r["Id"])) else: db.execute(""" INSERT INTO jobs (nocodb_id, title, run_at, prep_at, raw_path, final_path, prep_status, play_status, log, stream_start_time, stream_retry_count) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, (r["Id"], r["Title"], run_at, prep_at, None, None, 'pending', 'pending', '', None, 0)) except Exception as e: logger.error(f"Failed to process row {r.get('Id', 'unknown')}: {e}") db.commit() except requests.exceptions.RequestException as e: logger.error(f"Failed to sync from NocoDB: {e}") raise def take_prep(): """Get the next job ready for preparation.""" try: # First, mark jobs as skipped if both prep and run times have passed db.execute(""" UPDATE jobs SET prep_status='skipped', play_status='skipped', log = log || ? WHERE prep_status='pending' AND run_at <= CURRENT_TIMESTAMP """, (f"[{datetime.now().isoformat()}] Job skipped - streaming time already passed\n",)) db.commit() # Get next overdue or upcoming prep job c = db.execute(""" SELECT * FROM jobs WHERE prep_status='pending' AND prep_at <= CURRENT_TIMESTAMP LIMIT 1 """) job = c.fetchone() if job: # Check if this is an overdue job prep_at = datetime.fromisoformat(job["prep_at"]) if prep_at < datetime.now() - timedelta(minutes=5): logger.warning(f"Processing overdue prep job: {job['nocodb_id']} - {job['title']} (was due {prep_at})") return job except Exception as e: logger.error(f"Failed to query prep jobs: {e}") return None def take_play(): """Get the next job ready for streaming with configurable grace period.""" try: # Calculate grace period cutoff (default 15 minutes ago) grace_period_cutoff = datetime.now() - timedelta(minutes=STREAM_GRACE_PERIOD_MINUTES) # Mark jobs more than STREAM_GRACE_PERIOD_MINUTES overdue as skipped db.execute(""" UPDATE jobs SET play_status='skipped', log = log || ? WHERE prep_status='done' AND play_status='pending' AND run_at < ? """, ( f"[{datetime.now().isoformat()}] Streaming skipped - more than {STREAM_GRACE_PERIOD_MINUTES} minutes late\n", grace_period_cutoff.isoformat() )) db.commit() # Get jobs ready to stream (on time or within 15-minute grace period) c = db.execute(""" SELECT * FROM jobs WHERE prep_status='done' AND play_status='pending' AND run_at <= CURRENT_TIMESTAMP LIMIT 1 """) job = c.fetchone() if job: # Check if this is a late start run_at = datetime.fromisoformat(job["run_at"]) delay = datetime.now() - run_at if delay > timedelta(seconds=30): minutes_late = delay.total_seconds() / 60 logger.warning(f"Starting stream late: {job['nocodb_id']} - {job['title']} ({minutes_late:.1f} minutes after scheduled time)") return job except Exception as e: logger.error(f"Failed to query play jobs: {e}") return None def run_subprocess(cmd, job_id, description, timeout=None): """Run a subprocess with error handling and logging.""" log_to_db(job_id, f"Starting: {description}") logger.info(f"Running command: {' '.join(str(c) for c in cmd)}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, check=True ) if result.stdout: logger.debug(f"stdout: {result.stdout[:500]}") log_to_db(job_id, f"Completed: {description}") return result except subprocess.TimeoutExpired: error_msg = f"Timeout: {description} exceeded {timeout}s" log_to_db(job_id, f"ERROR: {error_msg}") raise Exception(error_msg) except subprocess.CalledProcessError as e: error_msg = f"Command failed with code {e.returncode}: {e.stderr[:500] if e.stderr else 'no error output'}" log_to_db(job_id, f"ERROR: {description} - {error_msg}") raise Exception(error_msg) @retry_with_backoff(max_attempts=3, base_delay=2) def generate_subtitles(raw_file, job_id): """Generate subtitles using whisper.cpp.""" srt_file = raw_file.with_suffix(".srt") # Remove existing subtitle file if present if srt_file.exists(): srt_file.unlink() log_to_db(job_id, f"Removed existing subtitle file: {srt_file}") # Run whisper.cpp with correct format: whisper.cpp -m -f -osrt cmd = [ "whisper.cpp", "-m", str(WHISPER_MODEL), "-f", str(raw_file), "-osrt" ] run_subprocess(cmd, job_id, "Subtitle generation with whisper.cpp", timeout=3600) # Verify subtitle file was created if not srt_file.exists(): error_msg = f"Subtitle file not created: {srt_file}" log_to_db(job_id, f"ERROR: {error_msg}") raise Exception(error_msg) log_to_db(job_id, f"Subtitle file created successfully: {srt_file}") return srt_file @retry_with_backoff(max_attempts=3, base_delay=2) def encode_video_with_subtitles(raw_file, srt_file, final_file, job_id): """Encode video with burned-in subtitles using VAAPI.""" # Remove existing output file if present if final_file.exists(): final_file.unlink() log_to_db(job_id, f"Removed existing output file: {final_file}") # FFmpeg command with VAAPI encoding (h264_vaapi instead of h264_qsv) cmd = [ "ffmpeg", "-hwaccel", "vaapi", "-vaapi_device", VAAPI_DEVICE, "-i", str(raw_file), "-vf", f"subtitles={srt_file}:force_style=Fontname=Consolas,BackColour=&H80000000,Spacing=0.2,Outline=0,Shadow=0.75,format=yuv420p", "-c:v", "h264_vaapi", # Changed from h264_qsv to h264_vaapi "-qp", "23", # Quality parameter for VAAPI (similar to CRF) "-c:a", "aac", "-b:a", "192k", "-movflags", "faststart", "-y", # Overwrite output file str(final_file) ] run_subprocess(cmd, job_id, "Video encoding with VAAPI", timeout=7200) # Verify output file was created if not final_file.exists(): error_msg = f"Output video file not created: {final_file}" log_to_db(job_id, f"ERROR: {error_msg}") raise Exception(error_msg) file_size = final_file.stat().st_size / (1024 * 1024) # Size in MB log_to_db(job_id, f"Video encoded successfully: {final_file} ({file_size:.2f} MB)") return final_file def prepare_job(job): """Prepare a job: generate subtitles and encode video.""" job_id = job["nocodb_id"] title = job["title"] try: log_to_db(job_id, f"Starting preparation for: {title}") # Find raw video file matching_files = list(RAW_DIR.glob(f"*{title}*")) if not matching_files: error_msg = f"No files found matching title: {title}" log_to_db(job_id, f"ERROR: {error_msg}") db.execute("UPDATE jobs SET prep_status='failed' WHERE nocodb_id=?", (job_id,)) db.commit() return raw_file = max(matching_files, key=lambda x: x.stat().st_mtime) log_to_db(job_id, f"Found raw file: {raw_file}") # Generate subtitles srt_file = generate_subtitles(raw_file, job_id) # Prepare output filename final_file = FINAL_DIR / (raw_file.stem + ".converted.mp4") # Encode video with subtitles encode_video_with_subtitles(raw_file, srt_file, final_file, job_id) # Update database db.execute(""" UPDATE jobs SET prep_status='done', raw_path=?, final_path=? WHERE nocodb_id=? """, (str(raw_file), str(final_file), job_id)) db.commit() log_to_db(job_id, "Preparation completed successfully") except Exception as e: error_msg = f"Preparation failed: {e}" log_to_db(job_id, f"ERROR: {error_msg}") logger.error(f"Job {job_id} preparation failed: {e}") db.execute("UPDATE jobs SET prep_status='failed' WHERE nocodb_id=?", (job_id,)) db.commit() def get_video_duration(video_path): """Get video duration in seconds using ffprobe.""" try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path) ], capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout.strip(): return float(result.stdout.strip()) except Exception as e: logger.error(f"Failed to get video duration: {e}") return None def stream_job(job, seek_seconds=0): """Start streaming a prepared job with optional seek position.""" global streaming_process, current_streaming_job job_id = job["nocodb_id"] final_path = job["final_path"] try: # Check if starting late or restarting run_at = datetime.fromisoformat(job["run_at"]) delay = datetime.now() - run_at if seek_seconds > 0: log_to_db(job_id, f"Restarting stream at position {seek_seconds:.1f}s") elif delay > timedelta(seconds=30): minutes_late = delay.total_seconds() / 60 log_to_db(job_id, f"Starting stream {minutes_late:.1f} minutes late") log_to_db(job_id, f"Starting stream to: {RTMP_SERVER}") # Verify file exists if not Path(final_path).exists(): error_msg = f"Final video file not found: {final_path}" log_to_db(job_id, f"ERROR: {error_msg}") db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,)) db.commit() return False # Get video duration to know when it should finish video_duration = get_video_duration(final_path) if not video_duration: logger.warning(f"Could not determine video duration for {final_path}") # Stop previous stream if running if streaming_process and streaming_process.poll() is None: logger.info(f"Stopping previous stream (PID: {streaming_process.pid})") streaming_process.terminate() try: streaming_process.wait(timeout=5) except subprocess.TimeoutExpired: streaming_process.kill() log_to_db(job_id, "Stopped previous stream") # Build ffmpeg command with optional seek cmd = ["ffmpeg"] if seek_seconds > 0: # Seek to position (input seeking is faster) cmd.extend(["-ss", str(seek_seconds)]) cmd.extend([ "-re", # Read input at native frame rate "-i", final_path, "-c", "copy", # Copy streams without re-encoding "-f", "flv", RTMP_SERVER ]) logger.info(f"Starting stream: {' '.join(cmd)}") streaming_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Record when this stream started (for calculating seek position on restart) stream_start_time = datetime.now() current_streaming_job = job_id log_to_db(job_id, f"Stream started (PID: {streaming_process.pid})") # Update database - set status to 'streaming', not 'done' db.execute(""" UPDATE jobs SET play_status='streaming', stream_start_time=?, stream_retry_count=stream_retry_count+1 WHERE nocodb_id=? """, (stream_start_time.isoformat(), job_id)) db.commit() return True except Exception as e: error_msg = f"Streaming failed: {e}" log_to_db(job_id, f"ERROR: {error_msg}") logger.error(f"Job {job_id} streaming failed: {e}") db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,)) db.commit() return False def monitor_and_restart_stream(): """Monitor active stream and restart if it fails.""" global streaming_process, current_streaming_job # Check if there's supposed to be an active stream try: active_stream = db.execute(""" SELECT * FROM jobs WHERE play_status='streaming' LIMIT 1 """).fetchone() if not active_stream: # No active stream expected streaming_process = None current_streaming_job = None return job_id = active_stream["nocodb_id"] final_path = active_stream["final_path"] stream_start_time = active_stream["stream_start_time"] retry_count = active_stream["stream_retry_count"] or 0 # Check if process is still running if streaming_process and streaming_process.poll() is None: # Stream is running fine return # Stream is not running but should be if streaming_process: # Process exited - check if it was normal completion or error return_code = streaming_process.returncode if return_code == 0: # Normal completion - video finished logger.info(f"Stream completed successfully for job {job_id}") log_to_db(job_id, "Stream completed successfully") db.execute("UPDATE jobs SET play_status='done' WHERE nocodb_id=?", (job_id,)) db.commit() streaming_process = None current_streaming_job = None return else: # Error exit stderr = streaming_process.stderr.read().decode('utf-8')[-500:] if streaming_process.stderr else "" logger.error(f"Stream crashed for job {job_id} with code {return_code}: {stderr}") log_to_db(job_id, f"Stream crashed (exit code {return_code})") # Calculate how much time has elapsed since stream started if not stream_start_time: logger.error(f"No stream start time recorded for job {job_id}") db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,)) db.commit() return start_time = datetime.fromisoformat(stream_start_time) elapsed_seconds = (datetime.now() - start_time).total_seconds() # Get video duration to check if we should still be streaming video_duration = get_video_duration(final_path) if video_duration and elapsed_seconds >= video_duration: # Video should have finished by now logger.info(f"Stream duration exceeded for job {job_id} - marking as done") log_to_db(job_id, "Stream duration completed") db.execute("UPDATE jobs SET play_status='done' WHERE nocodb_id=?", (job_id,)) db.commit() streaming_process = None current_streaming_job = None return # Check retry limit (max 10 restarts) if retry_count >= 10: logger.error(f"Stream retry limit exceeded for job {job_id}") log_to_db(job_id, "ERROR: Stream failed after 10 restart attempts") db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,)) db.commit() streaming_process = None current_streaming_job = None return # Restart stream at correct position logger.warning(f"Restarting stream for job {job_id} at position {elapsed_seconds:.1f}s (attempt {retry_count + 1})") stream_job(active_stream, seek_seconds=elapsed_seconds) except Exception as e: logger.error(f"Error in stream monitor: {e}") def main(): """Main scheduler loop.""" logger.info("Starting scheduler...") validate_config() # Check for overdue jobs on startup try: grace_period_cutoff = datetime.now() - timedelta(minutes=STREAM_GRACE_PERIOD_MINUTES) overdue_prep = db.execute(""" SELECT COUNT(*) as count FROM jobs WHERE prep_status='pending' AND prep_at <= CURRENT_TIMESTAMP AND run_at > CURRENT_TIMESTAMP """).fetchone() skipped_prep = db.execute(""" SELECT COUNT(*) as count FROM jobs WHERE prep_status='pending' AND run_at <= CURRENT_TIMESTAMP """).fetchone() overdue_stream = db.execute(""" SELECT COUNT(*) as count FROM jobs WHERE prep_status='done' AND play_status='pending' AND run_at <= CURRENT_TIMESTAMP AND run_at >= ? """, (grace_period_cutoff.isoformat(),)).fetchone() skipped_stream = db.execute(""" SELECT COUNT(*) as count FROM jobs WHERE prep_status='done' AND play_status='pending' AND run_at < ? """, (grace_period_cutoff.isoformat(),)).fetchone() if overdue_prep and overdue_prep["count"] > 0: logger.warning(f"Found {overdue_prep['count']} overdue prep job(s) - will process immediately") if skipped_prep and skipped_prep["count"] > 0: logger.warning(f"Found {skipped_prep['count']} unprepared job(s) past streaming time - will be marked as skipped") if overdue_stream and overdue_stream["count"] > 0: logger.warning(f"Found {overdue_stream['count']} overdue streaming job(s) - will start within grace period ({STREAM_GRACE_PERIOD_MINUTES}min)") if skipped_stream and skipped_stream["count"] > 0: logger.warning(f"Found {skipped_stream['count']} streaming job(s) more than {STREAM_GRACE_PERIOD_MINUTES}min late - will be marked as skipped") except Exception as e: logger.error(f"Failed to check for overdue jobs: {e}") logger.info("Scheduler is running. Press Ctrl+C to stop.") try: # Calculate how many iterations between syncs sync_iterations = max(1, NOCODB_SYNC_INTERVAL_SECONDS // WATCHDOG_CHECK_INTERVAL_SECONDS) logger.info(f"Main loop: checking every {WATCHDOG_CHECK_INTERVAL_SECONDS}s, syncing every {sync_iterations} iterations ({NOCODB_SYNC_INTERVAL_SECONDS}s)") iteration = 0 while True: try: # Monitor and restart stream if needed (check every iteration) monitor_and_restart_stream() # Sync jobs from NocoDB at configured interval if iteration % sync_iterations == 0: sync() # Process preparation jobs job = take_prep() if job: logger.info(f"Processing prep job: {job['nocodb_id']} - {job['title']}") prepare_job(job) # Process streaming jobs job = take_play() if job: logger.info(f"Processing play job: {job['nocodb_id']} - {job['title']}") stream_job(job) except Exception as e: logger.error(f"Error in main loop: {e}") # Sleep between iterations time.sleep(WATCHDOG_CHECK_INTERVAL_SECONDS) iteration += 1 except KeyboardInterrupt: logger.info("Scheduler stopped by user") if streaming_process and streaming_process.poll() is None: logger.info("Stopping active stream...") streaming_process.terminate() finally: db.close() if __name__ == "__main__": main()