I won’t know if this script works until I run it and see the errors but the comments won’t start to generate until after all the posts so I can’t debug that part until I’ve already created too much content.

import sqlite3
import requests
from pythorhead import Lemmy
import schedule
import time
import logging
from config import *

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s %(message)s",
    handlers=[logging.FileHandler("debug.log"), logging.StreamHandler()],
)


def initialize_database():
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS posts (
            github_url TEXT PRIMARY KEY,
            lemmy_post_id INTEGER,
            lemmy_post_name TEXT,
            lemmy_post_body TEXT
        )
    """)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS comments (
            github_comment_id INTEGER PRIMARY KEY,
            lemmy_comment_id INTEGER,
            comment_user TEXT,
            comment_body TEXT
        )
    """)
    conn.commit()
    return conn


def initialize_lemmy_instance():
    lemmy = Lemmy(LEMMY_INSTANCE_URL)
    lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD)
    logging.info("Initialized Lemmy instance")
    return lemmy


def discover_community(lemmy, community_name):
    community_id = lemmy.discover_community(community_name)
    logging.info(f"Discovered community {community_name} with ID {community_id}")
    return community_id


def fetch_github_issues(repo):
    url = f"{GITHUB_API_BASE}/repos/{repo}/issues"
    headers = {"Accept": "application/vnd.github+json"}
    response = requests.get(url, headers=headers)
    logging.info(f"Fetched issues from {url}")
    return response.json()


def extract_issue_info(issue, repo):
    issue_url = issue["html_url"]
    issue_state = "[Closed]" if issue["state"] == "closed" else ""
    repo_abbr = "[BE]" if "lemmy" in repo else "[UI]"
    issue_title = f"{issue_state}{repo_abbr} {issue['title']} #{issue['number']}"
    issue_body = issue["body"]
    return issue_url, issue_title, issue_body


def post_issues_to_lemmy(lemmy, community_id, repo):
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    
    issues = fetch_github_issues(repo)
    for issue in issues:
        issue_url, issue_title, issue_body = extract_issue_info(issue, repo)
        
        cursor.execute("SELECT lemmy_post_id FROM posts WHERE github_url=?", (issue_url,))
        existing_post = cursor.fetchone()
        
        if not existing_post:
            post = lemmy.post.create(community_id, issue_title, url=issue_url, body=issue_body)["post_view"]["post"]
            lemmy_post_id = post["id"]
            lemmy_post_name = post["name"]
            lemmy_post_body = post["body"]
            cursor.execute("INSERT INTO posts (github_url, lemmy_post_id, lemmy_post_name, lemmy_post_body) VALUES (?, ?, ?, ?)", (issue_url, lemmy_post_id, lemmy_post_name, lemmy_post_body))
            conn.commit()
            logging.info(f"Posted issue {issue_title} to community {community_id}")


def fetch_github_comments(repo, issue_number):
    url = f"{GITHUB_API_BASE}/repos/{repo}/issues/{issue_number}/comments"
    headers = {"Accept": "application/vnd.github+json"}
    response = requests.get(url, headers=headers)
    logging.info(f"Fetched comments for issue #{issue_number}")
    return response.json()


def post_comments_to_lemmy(lemmy, post_id, repo, issue_number):
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    
    github_comments = fetch_github_comments(repo, issue_number)
    for comment in github_comments:
        github_comment_id = comment["id"]
        cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,))
        existing_comment = cursor.fetchone()
        
        if not existing_comment:
            comment_user = comment["user"]["login"]
            comment_body = comment["body"]
            lemmy_comment_id = lemmy.comment.create(post_id, comment_body)["comment"]["id"]

            cursor.execute("INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)", (github_comment_id, lemmy_comment_id, comment_user, comment_body))
            conn.commit()
            logging.info(f"Posted comment {github_comment_id} to lemmy post {post_id}")


# Fetch the GitHub issue number and Lemmy post ID for each issue
def fetch_issue_data(repo):
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    cursor.execute("SELECT github_url, lemmy_post_id FROM posts WHERE github_url LIKE ?", (f"https://github.com/{repo}/issues/%",))
    issue_data = cursor.fetchall()
    return issue_data


def extract_issue_number(github_url):
    return int(github_url.split("/")[-1])


def main():
    logging.info("Running main function")
    initialize_database()
    lemmy = initialize_lemmy_instance()
    community_id = discover_community(lemmy, LEMMY_COMMUNITY_NAME)
    for repo in REPOSITORIES:
        post_issues_to_lemmy(lemmy, community_id, repo)
        issue_data = fetch_issue_data(repo)
        for github_url, lemmy_post_id in issue_data:
            issue_number = extract_issue_number
            post_comments_to_lemmy(lemmy, lemmy_post_id, repo, issue_number)


def run_periodically():
    main()
    schedule.every(2).hours.do(main)

    while True:
        schedule.run_pending()
        time.sleep(60)


if __name__ == "__main__":
    logging.info("Starting script")
    run_periodically()
  • muppetjones
    link
    fedilink
    arrow-up
    2
    ·
    1 year ago

    Also, you probably want a rotating file handler for the logging.