From 48a68b1668af39c11c43952c3d4ec3f34baca943 Mon Sep 17 00:00:00 2001 From: erwinmsmith <1262214827@qq.com> Date: Sat, 11 Apr 2026 20:40:18 +0800 Subject: [PATCH] feat: add plug-and-play scraper module with X (Twitter) support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds src/scrapers/ as a fully decoupled data ingestion layer that fetches posts from social platforms and normalises them into THETA-compatible CSV files (data/{dataset}/{dataset}_cleaned.csv). Zero changes to src/models/ — the module hooks in via the existing find_data_file() discovery path in prepare_data.py. Structure: - base.py AbstractScraper protocol for all platforms - registry.py Platform registry (mirrors model/registry.py pattern) - adapter.py ThetaAdapter: normalises raw records → THETA CSV schema - cli.py Argparse entry point (python src/scrapers/cli.py) - platforms/x.py XScraper via tweepy + Twitter API v2 Also adds scripts/scrape.sh (bash wrapper) and X credential stubs to .env.example. Future platforms extend AbstractScraper and register in registry.py without touching any other file. --- .env.example | 16 ++ .gitignore | 3 + scripts/scrape.sh | 52 ++++++ src/scrapers/__init__.py | 3 + src/scrapers/adapter.py | 168 ++++++++++++++++++++ src/scrapers/base.py | 51 ++++++ src/scrapers/cli.py | 245 +++++++++++++++++++++++++++++ src/scrapers/platforms/__init__.py | 0 src/scrapers/platforms/x.py | 218 +++++++++++++++++++++++++ src/scrapers/registry.py | 79 ++++++++++ 10 files changed, 835 insertions(+) create mode 100755 scripts/scrape.sh create mode 100644 src/scrapers/__init__.py create mode 100644 src/scrapers/adapter.py create mode 100644 src/scrapers/base.py create mode 100644 src/scrapers/cli.py create mode 100644 src/scrapers/platforms/__init__.py create mode 100644 src/scrapers/platforms/x.py create mode 100644 src/scrapers/registry.py diff --git a/.env.example b/.env.example index 37f2562..f31925f 100644 --- a/.env.example +++ b/.env.example @@ -95,6 +95,22 @@ CUDA_VISIBLE_DEVICES=0 # Log level: DEBUG, INFO, WARNING, ERROR LOG_LEVEL=INFO +# ============================================================================= +# Scraper Module (src/scrapers/) — optional, independent of THETA core +# ============================================================================= + +# X (Twitter) API v2 — https://developer.twitter.com/en/portal/dashboard +# Only X_BEARER_TOKEN is required for keyword search (app-only auth). +# The remaining keys are needed only for OAuth 1.0a user-context endpoints. +X_BEARER_TOKEN= +# X_API_KEY= +# X_API_SECRET= +# X_ACCESS_TOKEN= +# X_ACCESS_SECRET= + +# Future platform credentials can be added here following the same pattern. +# See src/scrapers/registry.py for the full list of supported platforms. + # ============================================================================= # Note: Hyperparameters should be set in config/default.yaml, NOT here. # This file is ONLY for physical paths and environment settings. diff --git a/.gitignore b/.gitignore index 041d1e9..c9faf5e 100644 --- a/.gitignore +++ b/.gitignore @@ -231,3 +231,6 @@ data/* # Model weights (keep directory structure but ignore model files) models/* !models/.gitkeep + +# IDE +CLAUDE.md \ No newline at end of file diff --git a/scripts/scrape.sh b/scripts/scrape.sh new file mode 100755 index 0000000..c5bd2fb --- /dev/null +++ b/scripts/scrape.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash +# ============================================================================= +# scrape.sh — Plug-and-play data scraper for THETA +# +# Fetches posts from a social media platform and saves them as a +# THETA-compatible CSV in DATA_DIR/{dataset}/{dataset}_cleaned.csv. +# +# Usage: +# bash scripts/scrape.sh --platform x \ +# --keywords "AI policy" "climate change" \ +# --dataset my_research \ +# --max-results 1000 +# +# bash scripts/scrape.sh --list-platforms +# +# All arguments are forwarded directly to src/scrapers/cli.py. +# See that file (or run with --help) for the full option list. +# ============================================================================= + +set -euo pipefail + +# --------------------------------------------------------------------------- +# Locate project root (the directory containing this script's parent) +# --------------------------------------------------------------------------- +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" + +# --------------------------------------------------------------------------- +# Load environment variables +# --------------------------------------------------------------------------- +if [ -f "$PROJECT_ROOT/.env" ]; then + set -a + # shellcheck disable=SC1090 + source "$PROJECT_ROOT/.env" + set +a +fi + +# --------------------------------------------------------------------------- +# Activate conda environment if present (mirrors quick_start.sh behaviour) +# --------------------------------------------------------------------------- +CONDA_ENV_NAME="${CONDA_ENV_NAME:-theta}" +if command -v conda &>/dev/null; then + # shellcheck disable=SC1090 + source "$(conda info --base)/etc/profile.d/conda.sh" 2>/dev/null || true + conda activate "$CONDA_ENV_NAME" 2>/dev/null || true +fi + +# --------------------------------------------------------------------------- +# Run the Python CLI from the project root (ensures src.scrapers imports work) +# --------------------------------------------------------------------------- +cd "$PROJECT_ROOT" +exec python src/scrapers/cli.py "$@" diff --git a/src/scrapers/__init__.py b/src/scrapers/__init__.py new file mode 100644 index 0000000..eef49e0 --- /dev/null +++ b/src/scrapers/__init__.py @@ -0,0 +1,3 @@ +# Plug-and-play data scraper module for THETA. +# This module is fully decoupled from the THETA core (src/models/). +# See src/scrapers/cli.py for the entry point. diff --git a/src/scrapers/adapter.py b/src/scrapers/adapter.py new file mode 100644 index 0000000..d5da375 --- /dev/null +++ b/src/scrapers/adapter.py @@ -0,0 +1,168 @@ +""" +ThetaAdapter — middleware between scrapers and the THETA pipeline. + +Responsibilities +---------------- +- Accept raw records from any AbstractScraper +- Normalize to THETA's strict CSV schema (text, timestamp, cov_*) +- Drop rows where text is empty after stripping +- Append to an existing dataset file (de-duplicate by "id" if present) +- Write the final CSV to DATA_DIR/{dataset_name}/{dataset_name}_cleaned.csv + +THETA's prepare_data.py:find_data_file() looks for *_cleaned.csv first, +so no changes to THETA core are needed. +""" + +import os +import re +from pathlib import Path + +import pandas as pd + + +# --------------------------------------------------------------------------- +# Path resolution (mirrors THETA's config.py logic without importing it) +# --------------------------------------------------------------------------- + +def _resolve_data_dir() -> Path: + """ + Resolve DATA_DIR from the environment, falling back to /data. + This mirrors the logic in src/models/config.py:get_absolute_path(). + """ + data_dir = os.environ.get("DATA_DIR", "").strip() + if data_dir: + return Path(data_dir) + # Walk up from this file to find the project root (src/scrapers/adapter.py → root) + project_root = Path(__file__).resolve().parents[2] + return project_root / "data" + + +# --------------------------------------------------------------------------- +# Adapter +# --------------------------------------------------------------------------- + +class ThetaAdapter: + """ + Converts raw scraper output into a THETA-compatible CSV file. + + Parameters + ---------- + data_dir : Path | None + Override DATA_DIR resolution. If None, reads from environment. + """ + + # Columns that THETA treats specially — everything else is kept as-is + _THETA_TEXT_COL = "text" + _THETA_TIME_COL = "timestamp" + _COV_PREFIX = "cov_" + _ID_COL = "id" + + def __init__(self, data_dir: Path | None = None): + self.data_dir = Path(data_dir) if data_dir else _resolve_data_dir() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def save( + self, + records: list[dict], + dataset_name: str, + verbose: bool = True, + ) -> Path: + """ + Normalize records and write (or append) to the dataset CSV. + + Parameters + ---------- + records : list[dict] + Raw records from a scraper. Each must have at least a "text" key. + dataset_name : str + Determines the output directory and filename: + DATA_DIR/{dataset_name}/{dataset_name}_cleaned.csv + verbose : bool + Print progress information. + + Returns + ------- + Path + Absolute path to the written CSV file. + """ + if not records: + raise ValueError("No records to save — scraper returned an empty list.") + + df_new = self._normalize(records) + + if verbose: + print(f"[adapter] Normalized {len(df_new)} records.") + + output_path = self._output_path(dataset_name) + output_path.parent.mkdir(parents=True, exist_ok=True) + + if output_path.exists(): + df_existing = pd.read_csv(output_path, dtype=str) + df_combined = self._merge(df_existing, df_new) + if verbose: + added = len(df_combined) - len(df_existing) + print( + f"[adapter] Existing file has {len(df_existing)} rows. " + f"Adding {added} new rows (after deduplication)." + ) + else: + df_combined = df_new + + df_combined.to_csv(output_path, index=False, encoding="utf-8") + + if verbose: + print(f"[adapter] Saved {len(df_combined)} rows → {output_path}") + + return output_path + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _normalize(self, records: list[dict]) -> pd.DataFrame: + """Convert raw dicts to a clean DataFrame with THETA-compatible columns.""" + df = pd.DataFrame(records) + + # Ensure "text" column exists + if self._THETA_TEXT_COL not in df.columns: + raise KeyError( + f"Scraper records must contain a '{self._THETA_TEXT_COL}' key. " + f"Got columns: {list(df.columns)}" + ) + + # Drop rows with empty text + df[self._THETA_TEXT_COL] = df[self._THETA_TEXT_COL].fillna("").str.strip() + df = df[df[self._THETA_TEXT_COL] != ""].copy() + + if df.empty: + raise ValueError("All records have empty text after stripping whitespace.") + + # Build output column order: text first, then timestamp, then cov_*, then rest + ordered_cols = [self._THETA_TEXT_COL] + if self._THETA_TIME_COL in df.columns: + ordered_cols.append(self._THETA_TIME_COL) + cov_cols = [c for c in df.columns if c.startswith(self._COV_PREFIX)] + ordered_cols.extend(sorted(cov_cols)) + # Add remaining columns (id and any platform-specific extras) + remaining = [ + c for c in df.columns if c not in ordered_cols + ] + ordered_cols.extend(remaining) + + return df[ordered_cols].reset_index(drop=True) + + def _merge(self, existing: pd.DataFrame, new: pd.DataFrame) -> pd.DataFrame: + """Append new rows, deduplicating by 'id' if that column is present.""" + combined = pd.concat([existing, new], ignore_index=True) + if self._ID_COL in combined.columns: + combined = combined.drop_duplicates( + subset=[self._ID_COL], keep="first" + ).reset_index(drop=True) + return combined + + def _output_path(self, dataset_name: str) -> Path: + safe_name = re.sub(r"[^\w\-]", "_", dataset_name) + return self.data_dir / safe_name / f"{safe_name}_cleaned.csv" diff --git a/src/scrapers/base.py b/src/scrapers/base.py new file mode 100644 index 0000000..674eb9e --- /dev/null +++ b/src/scrapers/base.py @@ -0,0 +1,51 @@ +""" +Abstract base class for all platform scrapers. + +To add a new platform: + 1. Subclass AbstractScraper + 2. Implement fetch() + 3. Register in src/scrapers/registry.py + +Contract for fetch() return value +---------------------------------- +Each record is a plain dict that MUST contain at least: + - "text" : str — the main text content + +Optional keys (forwarded to THETA CSV as-is): + - "timestamp" : str | int — ISO date string or year integer (needed by DTM) + - "id" : str — unique post ID (used for deduplication in adapter) + - "cov_*" : any — covariate columns (needed by STM); key must start with "cov_" + +Any extra keys are preserved in the output CSV so downstream analysis is not limited. +""" + +from abc import ABC, abstractmethod + + +class AbstractScraper(ABC): + """Base class that every platform scraper must inherit from.""" + + @abstractmethod + def fetch(self, keywords: list[str], max_results: int, **kwargs) -> list[dict]: + """ + Fetch posts matching the given keywords. + + Parameters + ---------- + keywords : list[str] + Search terms. Multiple terms are combined with OR logic. + max_results : int + Approximate upper bound on the number of records to return. + Implementations may return slightly more due to pagination boundaries. + **kwargs + Platform-specific options (e.g. lang, start_time, end_time). + + Returns + ------- + list[dict] + Raw records conforming to the contract described in the module docstring. + """ + + def name(self) -> str: + """Human-readable platform name for logging.""" + return self.__class__.__name__ diff --git a/src/scrapers/cli.py b/src/scrapers/cli.py new file mode 100644 index 0000000..f821ee3 --- /dev/null +++ b/src/scrapers/cli.py @@ -0,0 +1,245 @@ +""" +Scraper CLI entry point for the THETA project. + +Usage +----- + python src/scrapers/cli.py --platform x \\ + --keywords "AI policy" "climate change" \\ + --dataset my_research \\ + --max-results 1000 + + # List available platforms + python src/scrapers/cli.py --list-platforms + +After running, feed the dataset directly into THETA: + python src/models/run_pipeline.py --dataset my_research --models theta +""" + +import argparse +import os +import sys +from pathlib import Path + +# --------------------------------------------------------------------------- +# Bootstrap: load .env before anything else (mirrors THETA core behaviour) +# --------------------------------------------------------------------------- + +def _load_dotenv(root: Path) -> None: + env_file = root / ".env" + if not env_file.exists(): + return + try: + from dotenv import load_dotenv + load_dotenv(env_file, override=False) + except ImportError: + # Minimal manual parser (same approach as THETA's config.py) + with open(env_file, encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, value = line.partition("=") + key = key.strip() + value = value.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = value + + +_PROJECT_ROOT = Path(__file__).resolve().parents[2] +_load_dotenv(_PROJECT_ROOT) + +# --------------------------------------------------------------------------- +# Imports (after env is loaded) +# --------------------------------------------------------------------------- + +from src.scrapers.registry import get_scraper, list_platforms, SCRAPER_REGISTRY # noqa: E402 +from src.scrapers.adapter import ThetaAdapter # noqa: E402 + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="python src/scrapers/cli.py", + description="Fetch social media data and save it as a THETA-compatible CSV.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Fetch English tweets about AI policy (last 7 days) + python src/scrapers/cli.py --platform x \\ + --keywords "AI policy" "machine learning" \\ + --dataset ai_policy_2024 --max-results 500 + + # Fetch with time range (Pro/Academic tier required for full archive) + python src/scrapers/cli.py --platform x \\ + --keywords "climate change" \\ + --dataset climate_2023 --max-results 2000 \\ + --lang en --start-time 2023-01-01 --end-time 2023-12-31 \\ + --full-archive + + # List available platforms + python src/scrapers/cli.py --list-platforms + """, + ) + + parser.add_argument( + "--platform", "-p", + type=str, + default=None, + choices=list(SCRAPER_REGISTRY.keys()), + help="Social media platform to scrape.", + ) + parser.add_argument( + "--keywords", "-k", + nargs="+", + metavar="KEYWORD", + help="One or more search keywords. Multiple terms are combined with OR.", + ) + parser.add_argument( + "--dataset", "-d", + type=str, + help=( + "Dataset name. Determines output path: " + "DATA_DIR/{dataset}/{dataset}_cleaned.csv" + ), + ) + parser.add_argument( + "--max-results", "-n", + type=int, + default=500, + help="Approximate maximum number of posts to collect (default: 500).", + ) + parser.add_argument( + "--lang", + type=str, + default="en", + help=( + "Language filter (BCP-47 code, e.g. 'en', 'zh', 'ja'). " + "Pass '' to disable. Default: 'en'." + ), + ) + parser.add_argument( + "--start-time", + type=str, + default=None, + metavar="YYYY-MM-DD", + help="Earliest post time (ISO 8601). Basic API tier: last 7 days only.", + ) + parser.add_argument( + "--end-time", + type=str, + default=None, + metavar="YYYY-MM-DD", + help="Latest post time (ISO 8601).", + ) + parser.add_argument( + "--full-archive", + action="store_true", + help="Use full-archive search (Pro/Academic API tier required).", + ) + parser.add_argument( + "--include-retweets", + action="store_true", + help="Include retweets (excluded by default).", + ) + parser.add_argument( + "--include-replies", + action="store_true", + help="Include replies (excluded by default).", + ) + parser.add_argument( + "--output-dir", + type=str, + default=None, + help=( + "Override DATA_DIR for the output path. " + "Defaults to DATA_DIR from .env or /data." + ), + ) + parser.add_argument( + "--list-platforms", + action="store_true", + help="List all registered scraper platforms and exit.", + ) + parser.add_argument( + "--quiet", "-q", + action="store_true", + help="Suppress progress output.", + ) + + return parser + + +def main(argv=None) -> None: + parser = build_parser() + args = parser.parse_args(argv) + + if args.list_platforms: + list_platforms() + sys.exit(0) + + # Validate required arguments for scraping + if not args.platform: + parser.error("--platform is required. Use --list-platforms to see options.") + if not args.keywords: + parser.error("--keywords is required.") + if not args.dataset: + parser.error("--dataset is required.") + + verbose = not args.quiet + + # Instantiate scraper + if verbose: + print(f"[scraper] Initialising platform: {args.platform}") + scraper = get_scraper(args.platform) + + # Build platform-specific kwargs + fetch_kwargs: dict = {} + if args.platform == "x": + fetch_kwargs = { + "lang": args.lang, + "start_time": args.start_time, + "end_time": args.end_time, + "use_full_archive": args.full_archive, + "exclude_retweets": not args.include_retweets, + "exclude_replies": not args.include_replies, + } + + # Fetch + if verbose: + print( + f"[scraper] Fetching up to {args.max_results} posts " + f"for keywords: {args.keywords}" + ) + records = scraper.fetch( + keywords=args.keywords, + max_results=args.max_results, + **fetch_kwargs, + ) + + if not records: + print("[scraper] No records returned. Nothing saved.") + sys.exit(1) + + # Save via adapter + adapter = ThetaAdapter( + data_dir=Path(args.output_dir) if args.output_dir else None + ) + output_path = adapter.save(records, dataset_name=args.dataset, verbose=verbose) + + if verbose: + print( + f"\n[scraper] Done.\n" + f" Dataset : {args.dataset}\n" + f" Records : {len(records)}\n" + f" Output : {output_path}\n" + f"\nNext step:\n" + f" python src/models/run_pipeline.py " + f"--dataset {args.dataset} --models theta\n" + ) + + +if __name__ == "__main__": + main() diff --git a/src/scrapers/platforms/__init__.py b/src/scrapers/platforms/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/scrapers/platforms/x.py b/src/scrapers/platforms/x.py new file mode 100644 index 0000000..184778d --- /dev/null +++ b/src/scrapers/platforms/x.py @@ -0,0 +1,218 @@ +""" +XScraper — fetches posts from X (Twitter) using the official API v2. + +Requirements +------------ + pip install tweepy + +Environment variables (set in .env) +------------------------------------ + X_BEARER_TOKEN — required for all search queries (app-only auth) + X_API_KEY — optional, only needed for OAuth 1.0a user-context endpoints + X_API_SECRET — optional + X_ACCESS_TOKEN — optional + X_ACCESS_SECRET — optional + +API tier notes +-------------- +- Basic tier (free developer account): search_recent_tweets only (last 7 days). +- Pro / Academic tier: full-archive search via search_all_tweets. + Set use_full_archive=True in fetch() to enable. +""" + +import os +import time +import logging +from datetime import datetime, timezone + +from src.scrapers.base import AbstractScraper + +logger = logging.getLogger(__name__) + + +class XScraper(AbstractScraper): + """Scraper for X (Twitter) using tweepy and API v2.""" + + # Maximum results per single API call (Twitter v2 hard limit) + _PAGE_SIZE = 100 + + def __init__(self): + bearer_token = os.environ.get("X_BEARER_TOKEN", "").strip() + if not bearer_token: + raise EnvironmentError( + "X_BEARER_TOKEN is not set. " + "Add it to your .env file before using the X scraper." + ) + try: + import tweepy # noqa: PLC0415 + except ImportError as exc: + raise ImportError( + "tweepy is required for the X scraper. Install it with:\n" + " pip install tweepy" + ) from exc + + self._tweepy = tweepy + self._client = tweepy.Client( + bearer_token=bearer_token, + consumer_key=os.environ.get("X_API_KEY") or None, + consumer_secret=os.environ.get("X_API_SECRET") or None, + access_token=os.environ.get("X_ACCESS_TOKEN") or None, + access_token_secret=os.environ.get("X_ACCESS_SECRET") or None, + wait_on_rate_limit=True, # tweepy handles 429 back-off automatically + ) + + def name(self) -> str: + return "X (Twitter)" + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def fetch( + self, + keywords: list[str], + max_results: int = 500, + lang: str = "en", + start_time: str | None = None, + end_time: str | None = None, + use_full_archive: bool = False, + exclude_retweets: bool = True, + exclude_replies: bool = False, + **kwargs, + ) -> list[dict]: + """ + Fetch tweets matching the given keywords. + + Parameters + ---------- + keywords : list[str] + Search terms. Combined with OR. Example: ["AI policy", "machine learning"]. + max_results : int + Target number of tweets to collect. Actual count may vary slightly + due to pagination boundaries. + lang : str + BCP-47 language code, e.g. "en", "zh", "ja". Pass "" to skip lang filter. + start_time : str | None + ISO 8601 start time, e.g. "2024-01-01" or "2024-01-01T00:00:00Z". + Basic tier is limited to the last 7 days. + end_time : str | None + ISO 8601 end time. + use_full_archive : bool + Use search_all_tweets (Pro/Academic tier) instead of search_recent_tweets. + exclude_retweets : bool + Append "-is:retweet" to the query. + exclude_replies : bool + Append "-is:reply" to the query. + + Returns + ------- + list[dict] + Records with keys: text, timestamp, id, cov_lang, cov_author_id. + """ + query = self._build_query( + keywords, lang, exclude_retweets, exclude_replies + ) + logger.info("[X] Query: %s | max_results=%d", query, max_results) + print(f"[x] Search query: {query}") + + search_fn = ( + self._client.search_all_tweets + if use_full_archive + else self._client.search_recent_tweets + ) + + tweet_fields = ["created_at", "lang", "author_id"] + records: list[dict] = [] + fetched = 0 + next_token = None + + while fetched < max_results: + batch_size = min(self._PAGE_SIZE, max_results - fetched) + # API minimum is 10 + batch_size = max(batch_size, 10) + + try: + response = search_fn( + query=query, + max_results=batch_size, + tweet_fields=tweet_fields, + start_time=self._parse_time(start_time) if start_time else None, + end_time=self._parse_time(end_time) if end_time else None, + next_token=next_token, + ) + except self._tweepy.errors.TweepyException as exc: + logger.error("[X] API error: %s", exc) + print(f"[x] API error: {exc}") + break + + if not response.data: + logger.info("[X] No more results.") + break + + for tweet in response.data: + records.append(self._to_record(tweet)) + + fetched += len(response.data) + print(f"[x] Fetched {fetched} tweets so far...") + + meta = response.meta or {} + next_token = meta.get("next_token") + if not next_token: + break + + print(f"[x] Done. Total tweets collected: {len(records)}") + return records + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _build_query( + self, + keywords: list[str], + lang: str, + exclude_retweets: bool, + exclude_replies: bool, + ) -> str: + if len(keywords) == 1: + keyword_part = f'"{keywords[0]}"' + else: + terms = " OR ".join(f'"{kw}"' for kw in keywords) + keyword_part = f"({terms})" + + parts = [keyword_part] + if lang: + parts.append(f"lang:{lang}") + if exclude_retweets: + parts.append("-is:retweet") + if exclude_replies: + parts.append("-is:reply") + + return " ".join(parts) + + def _to_record(self, tweet) -> dict: + ts = None + if tweet.created_at: + ts = tweet.created_at.strftime("%Y-%m-%dT%H:%M:%SZ") + + return { + "text": tweet.text, + "timestamp": ts, + "id": str(tweet.id), + "cov_lang": getattr(tweet, "lang", None), + "cov_author_id": str(tweet.author_id) if tweet.author_id else None, + } + + @staticmethod + def _parse_time(time_str: str) -> datetime: + """Accept 'YYYY-MM-DD' or full ISO 8601 strings.""" + for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S"): + try: + dt = datetime.strptime(time_str, fmt) + return dt.replace(tzinfo=timezone.utc) + except ValueError: + continue + raise ValueError( + f"Cannot parse time string '{time_str}'. " + "Expected format: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ" + ) diff --git a/src/scrapers/registry.py b/src/scrapers/registry.py new file mode 100644 index 0000000..ce10673 --- /dev/null +++ b/src/scrapers/registry.py @@ -0,0 +1,79 @@ +""" +Platform registry for the scraper module. + +Adding a new platform +--------------------- +1. Create src/scrapers/platforms/.py with a class that inherits AbstractScraper. +2. Add an entry to SCRAPER_REGISTRY below. +3. Document required_env so users know what to put in .env. +""" + +import importlib + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +SCRAPER_REGISTRY: dict[str, dict] = { + "x": { + "module": "src.scrapers.platforms.x", + "class": "XScraper", + "required_env": ["X_BEARER_TOKEN"], + "description": "X (Twitter) via API v2 — requires a developer bearer token", + }, + # Future platforms: + # "weibo": { + # "module": "src.scrapers.platforms.weibo", + # "class": "WeiboScraper", + # "required_env": ["WEIBO_APP_KEY", "WEIBO_APP_SECRET"], + # "description": "Weibo open platform API", + # }, + # "reddit": { + # "module": "src.scrapers.platforms.reddit", + # "class": "RedditScraper", + # "required_env": ["REDDIT_CLIENT_ID", "REDDIT_CLIENT_SECRET"], + # "description": "Reddit via PRAW", + # }, +} + + +def get_scraper(platform: str, **init_kwargs): + """ + Instantiate and return the scraper for the given platform name. + + Parameters + ---------- + platform : str + Key in SCRAPER_REGISTRY (e.g. "x"). + **init_kwargs + Passed directly to the scraper constructor. + + Raises + ------ + ValueError + If the platform is not registered. + ImportError + If the platform module or class cannot be imported. + """ + platform = platform.lower() + if platform not in SCRAPER_REGISTRY: + available = ", ".join(SCRAPER_REGISTRY.keys()) + raise ValueError( + f"Unknown platform '{platform}'. Available platforms: {available}" + ) + + entry = SCRAPER_REGISTRY[platform] + module = importlib.import_module(entry["module"]) + cls = getattr(module, entry["class"]) + return cls(**init_kwargs) + + +def list_platforms() -> None: + """Print a formatted list of all registered platforms.""" + print("\nRegistered scraper platforms:") + print(f" {'Platform':<12} {'Required env vars':<30} Description") + print(f" {'-'*12} {'-'*30} {'-'*40}") + for name, meta in SCRAPER_REGISTRY.items(): + env_str = ", ".join(meta["required_env"]) + print(f" {name:<12} {env_str:<30} {meta['description']}") + print()