autobroadcaster/agent.py

685 lines
25 KiB
Python

import requests
import sqlite3
import time
import subprocess
import hashlib
import os
import sys
import logging
from datetime import datetime, timedelta
from pathlib import Path
from functools import wraps
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load configuration from environment variables
NOCODB_URL = os.getenv("NOCODB_URL")
NOCODB_TOKEN = os.getenv("NOCODB_TOKEN")
RTMP_SERVER = os.getenv("RTMP_SERVER")
RAW_DIR = Path(os.getenv("RAW_DIR", "/root/surowe_filmy"))
FINAL_DIR = Path(os.getenv("FINAL_DIR", "/root/przygotowane_filmy"))
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "/root/models/ggml-base.bin")
VAAPI_DEVICE = os.getenv("VAAPI_DEVICE", "/dev/dri/renderD128")
STREAM_GRACE_PERIOD_MINUTES = int(os.getenv("STREAM_GRACE_PERIOD_MINUTES", "15"))
# Timing configuration
NOCODB_SYNC_INTERVAL_SECONDS = int(os.getenv("NOCODB_SYNC_INTERVAL_SECONDS", "60"))
WATCHDOG_CHECK_INTERVAL_SECONDS = int(os.getenv("WATCHDOG_CHECK_INTERVAL_SECONDS", "10"))
# Configuration validation
def validate_config():
"""Validate required environment variables on startup."""
required_vars = {
"NOCODB_URL": NOCODB_URL,
"NOCODB_TOKEN": NOCODB_TOKEN,
"RTMP_SERVER": RTMP_SERVER
}
missing = [name for name, value in required_vars.items() if not value]
if missing:
logger.error(f"Missing required environment variables: {', '.join(missing)}")
logger.error("Please set the following environment variables:")
logger.error(" NOCODB_URL - NocoDB API endpoint")
logger.error(" NOCODB_TOKEN - Authentication token")
logger.error(" RTMP_SERVER - Streaming destination")
sys.exit(1)
# Validate directories exist or can be created
for dir_path in [RAW_DIR, FINAL_DIR]:
try:
dir_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Cannot create directory {dir_path}: {e}")
sys.exit(1)
logger.info("Configuration validated successfully")
logger.info(f"RAW_DIR: {RAW_DIR}")
logger.info(f"FINAL_DIR: {FINAL_DIR}")
logger.info(f"WHISPER_MODEL: {WHISPER_MODEL}")
logger.info(f"VAAPI_DEVICE: {VAAPI_DEVICE}")
logger.info(f"STREAM_GRACE_PERIOD: {STREAM_GRACE_PERIOD_MINUTES} minutes")
logger.info(f"NOCODB_SYNC_INTERVAL: {NOCODB_SYNC_INTERVAL_SECONDS} seconds")
logger.info(f"WATCHDOG_CHECK_INTERVAL: {WATCHDOG_CHECK_INTERVAL_SECONDS} seconds")
# Database setup
db = sqlite3.connect("scheduler.db", check_same_thread=False)
db.row_factory = sqlite3.Row
# Ensure table exists with log column and streaming metadata
db.execute("""
CREATE TABLE IF NOT EXISTS jobs (
nocodb_id TEXT PRIMARY KEY,
title TEXT,
run_at TIMESTAMP,
prep_at TIMESTAMP,
raw_path TEXT,
final_path TEXT,
prep_status TEXT,
play_status TEXT,
log TEXT,
stream_start_time TIMESTAMP,
stream_retry_count INTEGER DEFAULT 0
)
""")
db.commit()
# Add new columns if they don't exist (for existing databases)
try:
db.execute("ALTER TABLE jobs ADD COLUMN stream_start_time TIMESTAMP")
db.commit()
except:
pass
try:
db.execute("ALTER TABLE jobs ADD COLUMN stream_retry_count INTEGER DEFAULT 0")
db.commit()
except:
pass
# Track streaming process
streaming_process = None
current_streaming_job = None
def log_to_db(nocodb_id, message):
"""Append a timestamped log message to the database log column."""
timestamp = datetime.now().isoformat()
log_entry = f"[{timestamp}] {message}\n"
try:
db.execute("""
UPDATE jobs SET log = log || ? WHERE nocodb_id = ?
""", (log_entry, nocodb_id))
db.commit()
logger.info(f"Job {nocodb_id}: {message}")
except Exception as e:
logger.error(f"Failed to log to database: {e}")
def retry_with_backoff(max_attempts=3, base_delay=1):
"""Decorator to retry a function with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts:
logger.error(f"{func.__name__} failed after {max_attempts} attempts: {e}")
raise
delay = base_delay * (2 ** (attempt - 1))
logger.warning(f"{func.__name__} attempt {attempt} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_attempts=3)
def sync():
"""Sync jobs from NocoDB."""
try:
response = requests.get(
NOCODB_URL,
headers={"xc-token": NOCODB_TOKEN},
timeout=30
)
response.raise_for_status()
rows = response.json().get("list", [])
logger.info(f"Fetched {len(rows)} jobs from NocoDB")
for r in rows:
try:
run_at = datetime.fromisoformat(r["Date"])
prep_at = run_at - timedelta(hours=6)
# Preserve existing status and streaming data on sync
existing = db.execute("SELECT * FROM jobs WHERE nocodb_id=?", (r["Id"],)).fetchone()
if existing:
db.execute("""
UPDATE jobs SET title=?, run_at=?, prep_at=?
WHERE nocodb_id=?
""", (r["Title"], run_at, prep_at, r["Id"]))
else:
db.execute("""
INSERT INTO jobs (nocodb_id, title, run_at, prep_at, raw_path, final_path,
prep_status, play_status, log, stream_start_time, stream_retry_count)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""", (r["Id"], r["Title"], run_at, prep_at, None, None, 'pending', 'pending', '', None, 0))
except Exception as e:
logger.error(f"Failed to process row {r.get('Id', 'unknown')}: {e}")
db.commit()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to sync from NocoDB: {e}")
raise
def take_prep():
"""Get the next job ready for preparation."""
try:
# First, mark jobs as skipped if both prep and run times have passed
db.execute("""
UPDATE jobs
SET prep_status='skipped', play_status='skipped',
log = log || ?
WHERE prep_status='pending'
AND run_at <= CURRENT_TIMESTAMP
""", (f"[{datetime.now().isoformat()}] Job skipped - streaming time already passed\n",))
db.commit()
# Get next overdue or upcoming prep job
c = db.execute("""
SELECT * FROM jobs WHERE prep_status='pending' AND prep_at <= CURRENT_TIMESTAMP LIMIT 1
""")
job = c.fetchone()
if job:
# Check if this is an overdue job
prep_at = datetime.fromisoformat(job["prep_at"])
if prep_at < datetime.now() - timedelta(minutes=5):
logger.warning(f"Processing overdue prep job: {job['nocodb_id']} - {job['title']} (was due {prep_at})")
return job
except Exception as e:
logger.error(f"Failed to query prep jobs: {e}")
return None
def take_play():
"""Get the next job ready for streaming with configurable grace period."""
try:
# Calculate grace period cutoff (default 15 minutes ago)
grace_period_cutoff = datetime.now() - timedelta(minutes=STREAM_GRACE_PERIOD_MINUTES)
# Mark jobs more than STREAM_GRACE_PERIOD_MINUTES overdue as skipped
db.execute("""
UPDATE jobs
SET play_status='skipped',
log = log || ?
WHERE prep_status='done'
AND play_status='pending'
AND run_at < ?
""", (
f"[{datetime.now().isoformat()}] Streaming skipped - more than {STREAM_GRACE_PERIOD_MINUTES} minutes late\n",
grace_period_cutoff.isoformat()
))
db.commit()
# Get jobs ready to stream (on time or within 15-minute grace period)
c = db.execute("""
SELECT * FROM jobs
WHERE prep_status='done'
AND play_status='pending'
AND run_at <= CURRENT_TIMESTAMP
LIMIT 1
""")
job = c.fetchone()
if job:
# Check if this is a late start
run_at = datetime.fromisoformat(job["run_at"])
delay = datetime.now() - run_at
if delay > timedelta(seconds=30):
minutes_late = delay.total_seconds() / 60
logger.warning(f"Starting stream late: {job['nocodb_id']} - {job['title']} ({minutes_late:.1f} minutes after scheduled time)")
return job
except Exception as e:
logger.error(f"Failed to query play jobs: {e}")
return None
def run_subprocess(cmd, job_id, description, timeout=None):
"""Run a subprocess with error handling and logging."""
log_to_db(job_id, f"Starting: {description}")
logger.info(f"Running command: {' '.join(str(c) for c in cmd)}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=True
)
if result.stdout:
logger.debug(f"stdout: {result.stdout[:500]}")
log_to_db(job_id, f"Completed: {description}")
return result
except subprocess.TimeoutExpired:
error_msg = f"Timeout: {description} exceeded {timeout}s"
log_to_db(job_id, f"ERROR: {error_msg}")
raise Exception(error_msg)
except subprocess.CalledProcessError as e:
error_msg = f"Command failed with code {e.returncode}: {e.stderr[:500] if e.stderr else 'no error output'}"
log_to_db(job_id, f"ERROR: {description} - {error_msg}")
raise Exception(error_msg)
@retry_with_backoff(max_attempts=3, base_delay=2)
def generate_subtitles(raw_file, job_id):
"""Generate subtitles using whisper.cpp."""
srt_file = raw_file.with_suffix(".srt")
# Remove existing subtitle file if present
if srt_file.exists():
srt_file.unlink()
log_to_db(job_id, f"Removed existing subtitle file: {srt_file}")
# Run whisper.cpp with correct format: whisper.cpp -m <model> -f <file> -osrt
cmd = [
"whisper.cpp",
"-m", str(WHISPER_MODEL),
"-f", str(raw_file),
"-osrt"
]
run_subprocess(cmd, job_id, "Subtitle generation with whisper.cpp", timeout=3600)
# Verify subtitle file was created
if not srt_file.exists():
error_msg = f"Subtitle file not created: {srt_file}"
log_to_db(job_id, f"ERROR: {error_msg}")
raise Exception(error_msg)
log_to_db(job_id, f"Subtitle file created successfully: {srt_file}")
return srt_file
@retry_with_backoff(max_attempts=3, base_delay=2)
def encode_video_with_subtitles(raw_file, srt_file, final_file, job_id):
"""Encode video with burned-in subtitles using VAAPI."""
# Remove existing output file if present
if final_file.exists():
final_file.unlink()
log_to_db(job_id, f"Removed existing output file: {final_file}")
# FFmpeg command with VAAPI encoding (h264_vaapi instead of h264_qsv)
cmd = [
"ffmpeg",
"-hwaccel", "vaapi",
"-vaapi_device", VAAPI_DEVICE,
"-i", str(raw_file),
"-vf", f"subtitles={srt_file}:force_style=Fontname=Consolas,BackColour=&H80000000,Spacing=0.2,Outline=0,Shadow=0.75,format=yuv420p",
"-c:v", "h264_vaapi", # Changed from h264_qsv to h264_vaapi
"-qp", "23", # Quality parameter for VAAPI (similar to CRF)
"-c:a", "aac",
"-b:a", "192k",
"-movflags", "faststart",
"-y", # Overwrite output file
str(final_file)
]
run_subprocess(cmd, job_id, "Video encoding with VAAPI", timeout=7200)
# Verify output file was created
if not final_file.exists():
error_msg = f"Output video file not created: {final_file}"
log_to_db(job_id, f"ERROR: {error_msg}")
raise Exception(error_msg)
file_size = final_file.stat().st_size / (1024 * 1024) # Size in MB
log_to_db(job_id, f"Video encoded successfully: {final_file} ({file_size:.2f} MB)")
return final_file
def prepare_job(job):
"""Prepare a job: generate subtitles and encode video."""
job_id = job["nocodb_id"]
title = job["title"]
try:
log_to_db(job_id, f"Starting preparation for: {title}")
# Find raw video file
matching_files = list(RAW_DIR.glob(f"*{title}*"))
if not matching_files:
error_msg = f"No files found matching title: {title}"
log_to_db(job_id, f"ERROR: {error_msg}")
db.execute("UPDATE jobs SET prep_status='failed' WHERE nocodb_id=?", (job_id,))
db.commit()
return
raw_file = max(matching_files, key=lambda x: x.stat().st_mtime)
log_to_db(job_id, f"Found raw file: {raw_file}")
# Generate subtitles
srt_file = generate_subtitles(raw_file, job_id)
# Prepare output filename
final_file = FINAL_DIR / (raw_file.stem + ".converted.mp4")
# Encode video with subtitles
encode_video_with_subtitles(raw_file, srt_file, final_file, job_id)
# Update database
db.execute("""
UPDATE jobs SET prep_status='done', raw_path=?, final_path=? WHERE nocodb_id=?
""", (str(raw_file), str(final_file), job_id))
db.commit()
log_to_db(job_id, "Preparation completed successfully")
except Exception as e:
error_msg = f"Preparation failed: {e}"
log_to_db(job_id, f"ERROR: {error_msg}")
logger.error(f"Job {job_id} preparation failed: {e}")
db.execute("UPDATE jobs SET prep_status='failed' WHERE nocodb_id=?", (job_id,))
db.commit()
def get_video_duration(video_path):
"""Get video duration in seconds using ffprobe."""
try:
result = subprocess.run(
[
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(video_path)
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 and result.stdout.strip():
return float(result.stdout.strip())
except Exception as e:
logger.error(f"Failed to get video duration: {e}")
return None
def stream_job(job, seek_seconds=0):
"""Start streaming a prepared job with optional seek position."""
global streaming_process, current_streaming_job
job_id = job["nocodb_id"]
final_path = job["final_path"]
try:
# Check if starting late or restarting
run_at = datetime.fromisoformat(job["run_at"])
delay = datetime.now() - run_at
if seek_seconds > 0:
log_to_db(job_id, f"Restarting stream at position {seek_seconds:.1f}s")
elif delay > timedelta(seconds=30):
minutes_late = delay.total_seconds() / 60
log_to_db(job_id, f"Starting stream {minutes_late:.1f} minutes late")
log_to_db(job_id, f"Starting stream to: {RTMP_SERVER}")
# Verify file exists
if not Path(final_path).exists():
error_msg = f"Final video file not found: {final_path}"
log_to_db(job_id, f"ERROR: {error_msg}")
db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,))
db.commit()
return False
# Get video duration to know when it should finish
video_duration = get_video_duration(final_path)
if not video_duration:
logger.warning(f"Could not determine video duration for {final_path}")
# Stop previous stream if running
if streaming_process and streaming_process.poll() is None:
logger.info(f"Stopping previous stream (PID: {streaming_process.pid})")
streaming_process.terminate()
try:
streaming_process.wait(timeout=5)
except subprocess.TimeoutExpired:
streaming_process.kill()
log_to_db(job_id, "Stopped previous stream")
# Build ffmpeg command with optional seek
cmd = ["ffmpeg"]
if seek_seconds > 0:
# Seek to position (input seeking is faster)
cmd.extend(["-ss", str(seek_seconds)])
cmd.extend([
"-re", # Read input at native frame rate
"-i", final_path,
"-c", "copy", # Copy streams without re-encoding
"-f", "flv",
RTMP_SERVER
])
logger.info(f"Starting stream: {' '.join(cmd)}")
streaming_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Record when this stream started (for calculating seek position on restart)
stream_start_time = datetime.now()
current_streaming_job = job_id
log_to_db(job_id, f"Stream started (PID: {streaming_process.pid})")
# Update database - set status to 'streaming', not 'done'
db.execute("""
UPDATE jobs
SET play_status='streaming',
stream_start_time=?,
stream_retry_count=stream_retry_count+1
WHERE nocodb_id=?
""", (stream_start_time.isoformat(), job_id))
db.commit()
return True
except Exception as e:
error_msg = f"Streaming failed: {e}"
log_to_db(job_id, f"ERROR: {error_msg}")
logger.error(f"Job {job_id} streaming failed: {e}")
db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,))
db.commit()
return False
def monitor_and_restart_stream():
"""Monitor active stream and restart if it fails."""
global streaming_process, current_streaming_job
# Check if there's supposed to be an active stream
try:
active_stream = db.execute("""
SELECT * FROM jobs WHERE play_status='streaming' LIMIT 1
""").fetchone()
if not active_stream:
# No active stream expected
streaming_process = None
current_streaming_job = None
return
job_id = active_stream["nocodb_id"]
final_path = active_stream["final_path"]
stream_start_time = active_stream["stream_start_time"]
retry_count = active_stream["stream_retry_count"] or 0
# Check if process is still running
if streaming_process and streaming_process.poll() is None:
# Stream is running fine
return
# Stream is not running but should be
if streaming_process:
# Process exited - check if it was normal completion or error
return_code = streaming_process.returncode
if return_code == 0:
# Normal completion - video finished
logger.info(f"Stream completed successfully for job {job_id}")
log_to_db(job_id, "Stream completed successfully")
db.execute("UPDATE jobs SET play_status='done' WHERE nocodb_id=?", (job_id,))
db.commit()
streaming_process = None
current_streaming_job = None
return
else:
# Error exit
stderr = streaming_process.stderr.read().decode('utf-8')[-500:] if streaming_process.stderr else ""
logger.error(f"Stream crashed for job {job_id} with code {return_code}: {stderr}")
log_to_db(job_id, f"Stream crashed (exit code {return_code})")
# Calculate how much time has elapsed since stream started
if not stream_start_time:
logger.error(f"No stream start time recorded for job {job_id}")
db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,))
db.commit()
return
start_time = datetime.fromisoformat(stream_start_time)
elapsed_seconds = (datetime.now() - start_time).total_seconds()
# Get video duration to check if we should still be streaming
video_duration = get_video_duration(final_path)
if video_duration and elapsed_seconds >= video_duration:
# Video should have finished by now
logger.info(f"Stream duration exceeded for job {job_id} - marking as done")
log_to_db(job_id, "Stream duration completed")
db.execute("UPDATE jobs SET play_status='done' WHERE nocodb_id=?", (job_id,))
db.commit()
streaming_process = None
current_streaming_job = None
return
# Check retry limit (max 10 restarts)
if retry_count >= 10:
logger.error(f"Stream retry limit exceeded for job {job_id}")
log_to_db(job_id, "ERROR: Stream failed after 10 restart attempts")
db.execute("UPDATE jobs SET play_status='failed' WHERE nocodb_id=?", (job_id,))
db.commit()
streaming_process = None
current_streaming_job = None
return
# Restart stream at correct position
logger.warning(f"Restarting stream for job {job_id} at position {elapsed_seconds:.1f}s (attempt {retry_count + 1})")
stream_job(active_stream, seek_seconds=elapsed_seconds)
except Exception as e:
logger.error(f"Error in stream monitor: {e}")
def main():
"""Main scheduler loop."""
logger.info("Starting scheduler...")
validate_config()
# Check for overdue jobs on startup
try:
grace_period_cutoff = datetime.now() - timedelta(minutes=STREAM_GRACE_PERIOD_MINUTES)
overdue_prep = db.execute("""
SELECT COUNT(*) as count FROM jobs
WHERE prep_status='pending' AND prep_at <= CURRENT_TIMESTAMP AND run_at > CURRENT_TIMESTAMP
""").fetchone()
skipped_prep = db.execute("""
SELECT COUNT(*) as count FROM jobs
WHERE prep_status='pending' AND run_at <= CURRENT_TIMESTAMP
""").fetchone()
overdue_stream = db.execute("""
SELECT COUNT(*) as count FROM jobs
WHERE prep_status='done' AND play_status='pending' AND run_at <= CURRENT_TIMESTAMP AND run_at >= ?
""", (grace_period_cutoff.isoformat(),)).fetchone()
skipped_stream = db.execute("""
SELECT COUNT(*) as count FROM jobs
WHERE prep_status='done' AND play_status='pending' AND run_at < ?
""", (grace_period_cutoff.isoformat(),)).fetchone()
if overdue_prep and overdue_prep["count"] > 0:
logger.warning(f"Found {overdue_prep['count']} overdue prep job(s) - will process immediately")
if skipped_prep and skipped_prep["count"] > 0:
logger.warning(f"Found {skipped_prep['count']} unprepared job(s) past streaming time - will be marked as skipped")
if overdue_stream and overdue_stream["count"] > 0:
logger.warning(f"Found {overdue_stream['count']} overdue streaming job(s) - will start within grace period ({STREAM_GRACE_PERIOD_MINUTES}min)")
if skipped_stream and skipped_stream["count"] > 0:
logger.warning(f"Found {skipped_stream['count']} streaming job(s) more than {STREAM_GRACE_PERIOD_MINUTES}min late - will be marked as skipped")
except Exception as e:
logger.error(f"Failed to check for overdue jobs: {e}")
logger.info("Scheduler is running. Press Ctrl+C to stop.")
try:
# Calculate how many iterations between syncs
sync_iterations = max(1, NOCODB_SYNC_INTERVAL_SECONDS // WATCHDOG_CHECK_INTERVAL_SECONDS)
logger.info(f"Main loop: checking every {WATCHDOG_CHECK_INTERVAL_SECONDS}s, syncing every {sync_iterations} iterations ({NOCODB_SYNC_INTERVAL_SECONDS}s)")
iteration = 0
while True:
try:
# Monitor and restart stream if needed (check every iteration)
monitor_and_restart_stream()
# Sync jobs from NocoDB at configured interval
if iteration % sync_iterations == 0:
sync()
# Process preparation jobs
job = take_prep()
if job:
logger.info(f"Processing prep job: {job['nocodb_id']} - {job['title']}")
prepare_job(job)
# Process streaming jobs
job = take_play()
if job:
logger.info(f"Processing play job: {job['nocodb_id']} - {job['title']}")
stream_job(job)
except Exception as e:
logger.error(f"Error in main loop: {e}")
# Sleep between iterations
time.sleep(WATCHDOG_CHECK_INTERVAL_SECONDS)
iteration += 1
except KeyboardInterrupt:
logger.info("Scheduler stopped by user")
if streaming_process and streaming_process.poll() is None:
logger.info("Stopping active stream...")
streaming_process.terminate()
finally:
db.close()
if __name__ == "__main__":
main()