Sunday, 1 December 2024

 Directory Structure:


└── ./

    ├── synthetic_tiktok

    │   ├── make_video.py

    │   ├── source_material.py

    │   └── upload.py

    └── main.py




---

File: /synthetic_tiktok/make_video.py

---


import cv2

import numpy as np

from PIL import Image

import io

import pyttsx3

from pathlib import Path

from dotenv import load_dotenv

import os

from typing import Tuple, List


load_dotenv()


SAVE_PATH = os.getenv("SAVE_PATH")


# Function to generate video with synchronized audio

def generate_video(news: List[dict], 

                   audio_files: List[str], 

                   video_name: str, 

                   fps: int = 24,

                   frame_size: Tuple[int, int] = (1080, 1920),

                   item_duration: float = 14.0) -> None:

    """Main entry point to generate videos


    Args:

        news (List[dict]): A list containing multiple news items, each contained in a dictionary with the keys "title", "description" and "image_data"

        audio_files (List[str]): A list of paths towards the coresponding Audio files of each item, currently unused for some reason

        video_name (str): name of the video's filename

        fps (int, optional): Frames per second (FPS) of the video. Defaults to 24.

        frame_size (Tuple[int, int], optional): The resolution of the video of the video. Defaults to (1080, 1920).

        item_duration (float, optional): How much time to cover each source material. Defaults to 14.0.

    """

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    num_frames_per_word = fps * item_duration

    num_frames = num_frames_per_word * len(news)

    video_filename = SAVE_PATH / f"{video_name}.mp4"

    out = cv2.VideoWriter(str(video_filename), fourcc, fps, frame_size)


    for i in range(num_frames):

        paragraph_index = i // num_frames_per_word

        frame = np.zeros((frame_size[1], frame_size[0], 3), dtype=np.uint8)  # Black background


        if paragraph_index >= len(news):

            paragraph_index = len(news) - 1  # Prevent index out of range


        title, description, image_data = news[paragraph_index]

        text = f"{title}: {description}"


        font = cv2.FONT_HERSHEY_SIMPLEX

        font_scale = 1.5

        font_thickness = 3

        max_line_width = frame_size[0] - 40

        wrapped_text = wrap_text(text, font, font_scale, max_line_width)


        if image_data:

            img = data_to_image(image_data)

            if img:

                # Resize image using the updated Resampling method

                img = img.resize((800, 800), Image.Resampling.LANCZOS)  # Updated line

                img_np = np.array(img)

                if img_np.shape[2] == 4:

                    img_np = cv2.cvtColor(img_np, cv2.COLOR_RGBA2RGB)


                img_y, img_x, _ = img_np.shape

                # Calculate the center position for the image

                center_x = (frame_size[0] - img_x) // 2

                center_y = (frame_size[1] - img_y) // 2

                # Ensure image fits within the frame

                if center_x < 0 or center_y < 0:

                    print("Image is too large to fit on the frame.")

                else:

                    # Place the image at the center

                    frame[center_y:center_y + img_y, center_x:center_x + img_x] = img_np


        y0, dy = 1500, 60  # Positioning text near the bottom for high-resolution

        for j, line in enumerate(wrapped_text):

            text_size = cv2.getTextSize(line, font, font_scale, font_thickness)[0]

            text_x = int((frame_size[0] - text_size[0]) / 2)

            text_y = y0 + j * dy

            cv2.putText(frame, line, (text_x, text_y), font, font_scale, (255, 255, 255), font_thickness, lineType=cv2.LINE_AA)


        out.write(frame)


    out.release()


# Function to wrap text for video display

def wrap_text(text, font, font_scale, max_width):

    words = text.split(' ')

    lines = []

    current_line = ''


    for word in words:

        test_line = current_line + word + ' '

        text_size = cv2.getTextSize(test_line, font, font_scale, 1)[0]

        if text_size[0] <= max_width:

            current_line = test_line

        else:

            lines.append(current_line.strip())

            current_line = word + ' '


    if current_line:

        lines.append(current_line.strip())

    return lines


def data_to_image(image_data):

    try:

        img = Image.open(io.BytesIO(image_data))

        return img

    except Exception as e:

        print(f"Failed to convert image data: {e}")

        return None


# Use eleven labs instead


# Use Sovits based TTS instead


# Function to convert text to speech and save as audio files

def text_to_speech(news: List[dict]) -> List[str]:

    engine = pyttsx3.init()

    audio_files = []

    for i, news_data in enumerate(news):

        title = news_data["title"]

        description = news_data["description"]

        text = f"{title}. {description}"

        audio_path = f"news_{i + 1}.wav"

        engine.save_to_file(text, audio_path)

        audio_files.append(Path(audio_path))

    engine.runAndWait()

    return audio_files


# # Function to convert text to speech and save as audio files

# def text_to_speech_one(news: dict) -> str:

#     engine = pyttsx3.init()

#     for i, news_data in enumerate(news):

#         title = news_data["title"]

#         description = news_data["description"]

#         text = f"{title}. {description}"

#         audio_path = f"news_{i + 1}.wav"

#         engine.save_to_file(text, audio_path)

#     engine.runAndWait()

#     return audio_path



---

File: /synthetic_tiktok/source_material.py

---


import time

from selenium import webdriver

from selenium.webdriver.firefox.service import Service as FirefoxService

from webdriver_manager.firefox import GeckoDriverManager

from bs4 import BeautifulSoup

from typing import List


# TODO: Also crawl the main article text itself

def fetch_bbc(limit: int = 3) -> List[dict]:

    """_summary_


    Args:

        limit (int, optional): The amount of news articles to fetch fromn. Defaults to 3.


    Returns:

        List[dict]: A list containing news data, with the following keys:

                    "title": The title of the news piece

                    "description": The description of the news piece

                    "image_data": The path of the image

    """

    print("Fetching news...")

    try:

        # Initialize the Firefox driver

        driver = webdriver.Firefox(service=FirefoxService(GeckoDriverManager().install()))

        driver.get("https://www.bbc.com/news")


        # Wait for the page to load completely

        time.sleep(20)  # Consider replacing with WebDriverWait for better reliability


        # Parse the page source with BeautifulSoup

        soup = BeautifulSoup(driver.page_source, "html.parser")


        # Find the top 3 news articles

        articles = soup.find_all('h2', {"data-testid": "card-headline"}, limit=3)

        news = []

        for article in articles:

            title = article.text.strip() if article else "No title available."

            description_tag = article.find_next('p')

            description = description_tag.text.strip() if description_tag else "No description available."


            # Attempt to find the associated image

            image_tag = article.find_previous('img')

            image_data = None

            if image_tag and 'src' in image_tag.attrs:

                image_url = image_tag['src']

                try:

                    driver.get(image_url)

                    time.sleep(2)  # Short wait to ensure image loads

                    image_data = driver.get_screenshot_as_png()

                except Exception as img_e:

                    print(f"Failed to fetch image: {img_e}")


            result = {

                "title": title,

                "description": description,

                "image_data": image_data

            }

            news.append(result)


        driver.quit()

        return news

    except Exception as e:

        print(f"Failed to fetch news: {e}")

        return [("Error", f"Failed to fetch news: {e}", None)]

    




---

File: /synthetic_tiktok/upload.py

---


# Function to upload video to GitHub

import subprocess



def upload_video_to_github(video_path):

    # Prepare the PowerShell command

    # Escaping quotes for PowerShell

    command = (

        f'copy "{video_path}" .; '

        f'git add {video_path.name}; '

        f'git commit -m "Add {video_path.name}"; '

        f'git push origin main'

    )

    # Execute the command in PowerShell

    subprocess.run(["powershell", "-Command", command], check=True)

    

# TODO: upload to youtube via API

def upload_to_youtube():

    pass


# TODO: upload to tiktok via API



---

File: /main.py

---


from synthetic_tiktok.make_video import generate_video

from synthetic_tiktok.upload import upload_video_to_github, text_to_speech

from synthetic_tiktok.source_material import fetch_news


import threading

import tkinter as tk

import sys

import time

import os

from dotenv import load_dotenv


load_dotenv()


SAVE_PATH = os.getenv("SAVE_PATH")


def get_next_video_number():

    existing_videos = sorted(SAVE_PATH.glob("*.mp4"), key=lambda x: x.stem)

    if not existing_videos:

        return 1

    last_video = existing_videos[-1]

    try:

        return int(last_video.stem) + 1

    except ValueError:

        return last_video.stem + "_new"

    

    

# Function to display news and start automation

def display_news():

    threading.Thread(target=automate_video_generation, daemon=True).start()

    print("Automation started. Check the console for updates.")


# Function to handle the video generation and upload process

def automate_video_generation():

    try:

        # Initial Test: Generate and upload two videos with 3 minutes interval

        for test_run in range(2):

            video_number = get_next_video_number()

            news_data_list = fetch_news()

            audio_files = text_to_speech(news_data_list)

            video_path = generate_video(news_data_list, audio_files, video_number)

            upload_video_to_github(video_path)

            print(f"Uploaded video {video_number}.mp4 to GitHub.")

            if test_run == 0:

                print("Waiting for 3 minutes before generating the next test video...")

                time.sleep(3 * 60)  # Wait for 3 minutes


        print("Initial test completed. Starting hourly video generation...")


        # Continuous Loop: Generate and upload a new video every hour

        while True:

            video_number = get_next_video_number()

            news_data_list = fetch_news()

            audio_files = text_to_speech(news_data_list)

            video_path = generate_video(news_data_list, audio_files, video_number)

            upload_video_to_github(video_path)

            print(f"Uploaded video {video_number}.mp4 to GitHub.")

            print("Waiting for 1 hour before generating the next video...")

            time.sleep(60 * 60)  # Wait for 1 hour


    except KeyboardInterrupt:

        print("Automation stopped by user.")

        sys.exit()


    except Exception as e:

        print(f"An error occurred: {e}")

        sys.exit()


# Initialize the Tkinter GUI

root = tk.Tk()

root.title("Top News Video Generator")

root.geometry("300x150")


news_frame = tk.Frame(root)

news_frame.pack(fill="both", expand=True)


connect_button = tk.Button(root, text="Start Automation", command=display_news, font=("Arial", 12))

connect_button.pack(pady=50)


root.mainloop()


No comments:

Post a Comment

https://youtu.be/jZ38C-M3tyk?si=ERjuJ8hqsQwV2vAp