Complete documentation of EVERY public API function, method, class, and parameter in Capcat's codebase.
Source: Application/core/, Application/docs/api-reference.md
Location: Application/core/source_system/source_registry.py:28
from core.source_system.source_registry import SourceRegistry, get_source_registry
def __init__(self, sources_dir: str = None)
sources_dir (str, optional) - Path to sources directory# Use default directory
registry = SourceRegistry()
# Use custom directory
registry = SourceRegistry("/custom/sources/path")
def discover_sources(self) -> Dict[str, SourceConfig]
registry = SourceRegistry()
sources = registry.discover_sources()
print(f"Discovered {len(sources)} sources:")
for name, config in sources.items():
print(f" {name}: {config.display_name} ({config.category})")
def get_source(self, source_name: str, session: requests.Session = None) -> BaseSource
source_name (str, required) - Source identifiersession (requests.Session, optional) - HTTP session for connection poolingregistry = get_source_registry()
# Get source with default session
source = registry.get_source('hn')
# Get source with custom session
import requests
custom_session = requests.Session()
source = registry.get_source('bbc', session=custom_session)
# Use source
articles = source.discover_articles(count=10)
def get_available_sources(self) -> List[str]
registry = get_source_registry()
sources = registry.get_available_sources()
print(f"Available sources ({len(sources)}):")
for source_id in sorted(sources):
print(f" - {source_id}")
def get_source_config(self, source_name: str) -> Optional[SourceConfig]
source_name (str, required) - Source identifierregistry = get_source_registry()
config = registry.get_source_config('hn')
if config:
print(f"Name: {config.display_name}")
print(f"URL: {config.base_url}")
print(f"Category: {config.category}")
print(f"Timeout: {config.timeout}s")
else:
print("Source not found")
def get_sources_by_category(self, category: str) -> List[str]
category (str, required) - Category name (tech, news, science, ai, sports, etc.)registry = get_source_registry()
# Get all tech sources
tech_sources = registry.get_sources_by_category('tech')
print(f"Tech sources: {', '.join(tech_sources)}")
# Get all categories
categories = {}
for source_id in registry.get_available_sources():
config = registry.get_source_config(source_id)
if config.category not in categories:
categories[config.category] = []
categories[config.category].append(source_id)
for category, sources in sorted(categories.items()):
print(f"{category}: {len(sources)} sources")
def validate_all_sources(self, deep_validation: bool = False) -> Dict[str, List[str]]
deep_validation (bool, optional) - Whether to perform network connectivity testsregistry = get_source_registry()
# Basic validation only
errors = registry.validate_all_sources(deep_validation=False)
# Deep validation with network tests
errors = registry.validate_all_sources(deep_validation=True)
# Report errors
for source_name, error_list in errors.items():
if error_list:
print(f"{source_name}: FAILED")
for error in error_list:
print(f" - {error}")
else:
print(f"{source_name}: OK")
def get_source_registry() -> SourceRegistry
from core.source_system.source_registry import get_source_registry
# Get global registry
registry = get_source_registry()
# All calls return same instance
registry1 = get_source_registry()
registry2 = get_source_registry()
assert registry1 is registry2 # True
Location: Application/core/source_system/base_source.py:78
from core.source_system.base_source import BaseSource, SourceConfig, Article
def __init__(self, config: SourceConfig, session: requests.Session = None)
config (SourceConfig, required) - Source configurationsession (requests.Session, optional) - HTTP sessionself.config - SourceConfig instanceself.session - requests.Session instanceself.logger - Logger instance@property
@abstractmethod
def source_type(self) -> str
@property
def source_type(self) -> str:
return "custom"
@abstractmethod
def discover_articles(self, count: int) -> List[Article]
count (int, required) - Maximum number of articles to discoverdef discover_articles(self, count: int) -> List[Article]:
response = self.session.get(self.config.base_url)
soup = BeautifulSoup(response.text, 'html.parser')
articles = []
for link in soup.select('.article-link')[:count]:
articles.append(Article(
title=link.get_text(strip=True),
url=self._resolve_url(link['href']),
summary=link.get('aria-label', ''),
tags=['general']
))
self.logger.info(f"Discovered {len(articles)} articles")
return articles
@abstractmethod
def fetch_article_content(
self,
article: Article,
output_dir: str,
progress_callback: Callable = None
) -> Tuple[bool, Optional[str]]
article (Article, required) - Article to fetchoutput_dir (str, required) - Directory to save contentprogress_callback (Callable, optional) - Progress update functiondef fetch_article_content(
self,
article: Article,
output_dir: str,
progress_callback=None
) -> Tuple[bool, Optional[str]]:
try:
# Fetch content
response = self.session.get(article.url)
soup = BeautifulSoup(response.text, 'html.parser')
# Extract content
content = soup.select_one('.article-content')
if not content:
self.logger.error(f"No content found for {article.url}")
return False, None
# Convert to markdown
from core.formatter import html_to_markdown
markdown = html_to_markdown(str(content), article.url)
# Save to file
os.makedirs(output_dir, exist_ok=True)
article_path = os.path.join(output_dir, 'article.md')
with open(article_path, 'w', encoding='utf-8') as f:
f.write(f"# {article.title}\n\n")
f.write(f"URL: {article.url}\n\n")
f.write(markdown)
self.logger.info(f"Saved article to {article_path}")
return True, article_path
except Exception as e:
self.logger.error(f"Failed to fetch article: {e}")
return False, None
def fetch_comments(
self,
article: Article,
output_dir: str,
progress_callback: Callable = None
) -> bool
article (Article, required) - Article to fetch comments foroutput_dir (str, required) - Directory to save commentsprogress_callback (Callable, optional) - Progress update functionsupports_comments is False_fetch_comments_impl() if supportedsource = registry.get_source('hn')
article = Article(
title="Example",
url="https://news.ycombinator.com/item?id=12345",
comment_url="https://news.ycombinator.com/item?id=12345"
)
success = source.fetch_comments(article, "/output/dir")
if success:
print("Comments fetched successfully")
def validate_config(self) -> List[str]
source = registry.get_source('hn')
errors = source.validate_config()
if errors:
print("Validation failed:")
for error in errors:
print(f" - {error}")
else:
print("Configuration is valid")
Location: Application/core/source_system/base_source.py:14
from core.source_system.base_source import SourceConfig
@dataclass
class SourceConfig:
name: str
display_name: str
base_url: str
timeout: float = 10.0
rate_limit: float = 1.0
supports_comments: bool = False
has_comments: bool = False
category: str = "general"
custom_config: Dict[str, Any] = None
name (str, required) - Source identifierdisplay_name (str, required) - Human-readable namebase_url (str, required) - Base URLtimeout (float, default=10.0) - Request timeout secondsrate_limit (float, default=1.0) - Minimum seconds between requestssupports_comments (bool, default=False) - Comments support flaghas_comments (bool, default=False) - Comments enabled flagcategory (str, default="general") - Category namecustom_config (Dict, default=None) - Additional configurationdef to_dict(self) -> Dict[str, Any]
config = SourceConfig(
name="example",
display_name="Example News",
base_url="https://example.com/",
category="tech"
)
config_dict = config.to_dict()
print(config_dict)
# {
# 'name': 'example',
# 'display_name': 'Example News',
# 'base_url': 'https://example.com/',
# 'timeout': 10.0,
# 'rate_limit': 1.0,
# 'supports_comments': False,
# 'has_comments': False,
# 'category': 'tech'
# }
Location: Application/core/source_system/base_source.py:59
from core.source_system.base_source import Article
@dataclass
class Article:
title: str
url: str
comment_url: Optional[str] = None
author: Optional[str] = None
published_date: Optional[str] = None
summary: Optional[str] = None
tags: List[str] = None
title (str, required) - Article titleurl (str, required) - Article URLcomment_url (Optional[str], default=None) - Comments URLauthor (Optional[str], default=None) - Author namepublished_date (Optional[str], default=None) - Publication datesummary (Optional[str], default=None) - Article summarytags (List[str], default=None) - Article tagsarticle = Article(
title="Breaking News: AI Breakthrough",
url="https://example.com/article/123",
comment_url="https://example.com/article/123/comments",
author="John Doe",
published_date="2025-11-25",
summary="Researchers announce major AI advancement...",
tags=["ai", "tech", "research"]
)
print(f"{article.title} by {article.author}")
print(f"URL: {article.url}")
print(f"Tags: {', '.join(article.tags)}")
Location: Application/core/article_fetcher.py:110
from core.article_fetcher import ArticleFetcher, convert_html_with_timeout
def convert_html_with_timeout(
html_content: str,
url: str,
timeout: int = 30
) -> str
html_content (str, required) - Raw HTML to converturl (str, required) - Source URL for loggingtimeout (int, default=30) - Maximum conversion time secondsfrom core.article_fetcher import convert_html_with_timeout
html = "<html><body><h1>Title</h1><p>Content</p></body></html>"
markdown = convert_html_with_timeout(html, "https://example.com")
print(markdown)
# # Title
#
# Content
def set_global_update_mode(update_mode: bool)
update_mode (bool, required) - Enable/disable update modefrom core.article_fetcher import set_global_update_mode
# Enable update mode
set_global_update_mode(True)
# Process articles (will overwrite existing)
# ...
# Disable update mode
set_global_update_mode(False)
def get_global_update_mode() -> bool
from core.article_fetcher import get_global_update_mode
if get_global_update_mode():
print("Update mode is enabled - will overwrite existing articles")
else:
print("Update mode is disabled - will skip existing articles")
Location: Application/core/config.py
from core.config import get_config, load_config, FetchNewsConfig
from core.config import NetworkConfig, ProcessingConfig, LoggingConfig, UIConfig
def get_config() -> FetchNewsConfig
from core.config import get_config
config = get_config()
print(f"Max workers: {config.processing.max_workers}")
print(f"Timeout: {config.network.connect_timeout}s")
print(f"Log level: {config.logging.default_level}")
def load_config(config_file: Optional[str] = None) -> FetchNewsConfig
config_file (Optional[str], default=None) - Path to config filefrom core.config import load_config
# Load from default locations
config = load_config()
# Load from specific file
config = load_config("custom-config.yml")
# Access configuration
print(f"User agent: {config.network.user_agent}")
print(f"Download images: {config.processing.download_images}")
Location: Application/core/logging_config.py
from core.logging_config import get_logger, setup_logging
def get_logger(name: str = None) -> logging.Logger
name (str, optional) - Logger name (defaults to caller's module name)from core.logging_config import get_logger
# Get logger for current module
logger = get_logger(__name__)
# Use logger
logger.debug("Debug message")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
logger.critical("Critical message")
def setup_logging(
log_level: str = "INFO",
log_file: str = None,
use_colors: bool = True
) -> None
log_level (str, default="INFO") - Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)log_file (str, optional) - Path to log fileuse_colors (bool, default=True) - Enable colored console outputfrom core.logging_config import setup_logging, get_logger
# Setup logging
setup_logging(
log_level="DEBUG",
log_file="capcat.log",
use_colors=True
)
# Use logger
logger = get_logger(__name__)
logger.debug("Logging is configured")
Location: Application/core/utils.py
from core.utils import (
sanitize_filename,
create_output_directory_capcat,
resolve_url
)
def sanitize_filename(filename: str, max_length: int = 100) -> str
filename (str, required) - Filename to sanitizemax_length (int, default=100) - Maximum filename lengthfrom core.utils import sanitize_filename
# Sanitize filename
clean = sanitize_filename("My Article: Cool Stuff (2025).md")
print(clean)
# My_Article_Cool_Stuff_2025.md
# With length limit
short = sanitize_filename("Very Long Article Title That Exceeds Limit", max_length=20)
print(short)
# Very_Long_Article_...
def create_output_directory_capcat(
base_dir: str,
article_title: str,
source_name: str = "",
date_str: str = None
) -> str
base_dir (str, required) - Base output directoryarticle_title (str, required) - Article titlesource_name (str, default="") - Source identifierdate_str (str, optional) - Date string (auto-generated if None)from core.utils import create_output_directory_capcat
output_dir = create_output_directory_capcat(
base_dir="../News",
article_title="Breaking News Article",
source_name="bbc",
date_str="25-11-2025"
)
print(output_dir)
# ../News/news_25-11-2025/BBC_25-11-2025/01_Breaking_News_Article/
Core API modules: