Capcat implements a hybrid modular architecture combining config-driven simplicity with custom code flexibility. This document explains the technical architecture, design patterns, and implementation logic for junior developers.
cli.py - Command-line argument parsinginteractive.py - Interactive menu systemcapcat (bash wrapper) - Entry point automationcapcat.py - Main application logiccore/config.py - Configuration managementcore/logging_config.py - Logging setupcore/shutdown.py - Graceful shutdown handlingsources/active/config_driven/configs/*.yaml
# sources/active/config_driven/configs/iq.yaml
display_name: "InfoQ"
base_url: "https://www.infoq.com/news/"
category: tech
# Article discovery
article_selectors:
- .card__title a
- .news-headline a
# Content extraction
content_selectors:
- .article__content
- article.article
# Image handling
image_selectors:
- .article__image img
- figure img
# Optional: Custom settings
max_articles: 50
timeout: 30
1. Registry discovers YAML files
2. Validator checks schema compliance
3. ConfigDrivenSource loads configuration
4. Source uses BeautifulSoup for extraction
5. Selectors applied in order until match
6. Content converted to Markdown
sources/active/custom/<source_name>/source.py
# sources/active/custom/hn/source.py
from core.source_system.base_source import BaseSource, Article
class HackerNewsSource(BaseSource):
"""
Hacker News source with comment integration
"""
def __init__(self, config, session=None):
super().__init__(config, session)
self.api_base = "https://hacker-news.firebaseio.com/v0"
def get_articles(self, count=30):
"""
Fetch articles from Hacker News API
Returns:
List[Article]: Article objects with content
"""
# Fetch top story IDs
story_ids = self._fetch_top_stories(count)
# Fetch article details in parallel
articles = self._fetch_articles_parallel(story_ids)
# Fetch comments for each article
for article in articles:
article.comments = self._fetch_comments(article.id)
return articles
def _fetch_top_stories(self, count):
"""Fetch top story IDs from API"""
response = self.session.get(f"{self.api_base}/topstories.json")
return response.json()[:count]
def _fetch_article_content(self, story_id):
"""Fetch individual article"""
# Custom logic for HN API
pass
def _fetch_comments(self, story_id):
"""Recursively fetch comment tree"""
# Custom comment parsing
pass
def embed_media(article_content, media_files):
"""
Replace remote media URLs with local file references
Logic:
1. Parse HTML/Markdown for media tags
2. Download each media file
3. Generate local path (images/01.jpg)
4. Replace URL with relative path
5. Update content with new references
"""
media_map = {}
for idx, media_url in enumerate(extract_media_urls(content)):
# Download file
local_path = download_media(media_url, f"images/{idx:02d}")
# Map old URL to new path
media_map[media_url] = local_path
# Replace all occurrences
for old_url, new_path in media_map.items():
content = content.replace(old_url, new_path)
return content, media_map
def extract_content(html, selectors):
"""
Extract article content using CSS selectors
Logic:
1. Try each selector in order
2. Return first successful match
3. Fallback to readability algorithm
4. Clean extracted content
"""
soup = BeautifulSoup(html, 'lxml')
# Try each configured selector
for selector in selectors:
content = soup.select_one(selector)
if content and len(content.get_text()) > 100:
return clean_content(content)
# Fallback: Readability algorithm
return extract_via_readability(html)
def create_output_structure(article, batch_mode=True):
"""
Generate output folder structure
Batch mode:
../News/news_DD-MM-YYYY/Source_DD-MM-YYYY/NN_Title/
Single mode:
../Capcats/cc_DD-MM-YYYY-Title/
"""
date_str = datetime.now().strftime('%d-%m-%Y')
if batch_mode:
# Batch: organized by date and source
base_dir = f"../News/news_{date_str}"
source_dir = f"{article.source}_{date_str}"
article_dir = f"{article.number:02d}_{slugify(article.title)}"
path = Path(base_dir) / source_dir / article_dir
else:
# Single: direct archiving
article_name = f"cc_{date_str}-{slugify(article.title)}"
path = Path("../Capcats") / article_name
# Create structure
path.mkdir(parents=True, exist_ok=True)
(path / "images").mkdir(exist_ok=True)
(path / "files").mkdir(exist_ok=True)
return path
def slugify(title, max_length=60):
"""
Convert article title to filesystem-safe name
Logic:
1. Convert to lowercase
2. Replace spaces with underscores
3. Remove special characters
4. Truncate to max_length
5. Remove trailing underscores
"""
slug = title.lower()
slug = re.sub(r'[^a-z0-9\s-]', '', slug)
slug = re.sub(r'[\s-]+', '_', slug)
slug = slug[:max_length].rstrip('_')
return slug
class SourceFactory:
@staticmethod
def create_source(config, session=None):
"""
Create appropriate source type based on configuration
Returns:
ConfigDrivenSource or CustomSource instance
"""
if config.source_type == 'config_driven':
return ConfigDrivenSource(config, session)
elif config.source_type == 'custom':
return load_custom_source(config, session)
else:
raise ValueError(f"Unknown source type: {config.source_type}")
class SourceRegistry:
def __init__(self):
self._sources = {}
self._configs = {}
def discover_sources(self):
"""
Auto-discover sources from filesystem
Process:
1. Scan sources/active/config_driven/configs/*.yaml
2. Scan sources/active/custom/*/source.py
3. Validate each source
4. Register in internal registry
"""
# Discover config-driven sources
config_dir = Path("sources/active/config_driven/configs")
for yaml_file in config_dir.glob("*.yaml"):
config = self._load_config(yaml_file)
if self._validate_config(config):
self._register_source(config)
# Discover custom sources
custom_dir = Path("sources/active/custom")
for source_dir in custom_dir.iterdir():
if (source_dir / "source.py").exists():
config = self._load_custom_config(source_dir)
self._register_source(config)
def get_source(self, source_id, session=None):
"""Retrieve source instance by ID"""
config = self._configs.get(source_id)
if not config:
raise ValueError(f"Source not found: {source_id}")
return SourceFactory.create_source(config, session)
class ExtractionStrategy:
def extract_content(self, html):
raise NotImplementedError
class SelectorStrategy(ExtractionStrategy):
def __init__(self, selectors):
self.selectors = selectors
def extract_content(self, html):
soup = BeautifulSoup(html, 'lxml')
for selector in self.selectors:
content = soup.select_one(selector)
if content:
return content
return None
class ReadabilityStrategy(ExtractionStrategy):
def extract_content(self, html):
# Use readability algorithm
return extract_readable_content(html)
class APIStrategy(ExtractionStrategy):
def extract_content(self, api_response):
# Parse API JSON response
return api_response['content']
class SessionPool:
def __init__(self):
self._session = None
def get_session(self):
"""
Get or create shared requests.Session
Benefits:
- Connection reuse (HTTP keep-alive)
- Cookie persistence
- Retry logic configuration
- Timeout management
"""
if self._session is None:
self._session = requests.Session()
# Configure retry logic
retry = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
self._session.mount('http://', adapter)
self._session.mount('https://', adapter)
# Set default headers
self._session.headers.update({
'User-Agent': 'Capcat/2.0 (News Archiver)'
})
return self._session
def close(self):
"""Clean up session"""
if self._session:
self._session.close()
self._session = None
class ProgressTracker:
def __init__(self):
self._observers = []
def attach(self, observer):
"""Register progress observer"""
self._observers.append(observer)
def update_progress(self, current, total, message=""):
"""Notify all observers of progress update"""
for observer in self._observers:
observer.on_progress(current, total, message)
class TerminalProgressObserver:
def on_progress(self, current, total, message):
"""Display progress bar in terminal"""
percentage = (current / total) * 100
bar = "=" * int(percentage / 2)
print(f"\r[{bar:<50}] {percentage:.0f}% {message}", end="")
class LogProgressObserver:
def on_progress(self, current, total, message):
"""Log progress to file"""
logger.info(f"Progress: {current}/{total} - {message}")
def fetch_with_retry(url, max_retries=3):
"""
Fetch URL with exponential backoff retry
Logic:
1. Try request
2. If fails, wait and retry
3. Exponential backoff: 1s, 2s, 4s
4. After max_retries, raise exception
"""
for attempt in range(max_retries):
try:
response = session.get(url, timeout=30)
response.raise_for_status()
return response
except requests.Timeout:
if attempt == max_retries - 1:
raise TimeoutError(f"Request timeout after {max_retries} attempts")
time.sleep(2 ** attempt) # Exponential backoff
except requests.ConnectionError:
if attempt == max_retries - 1:
raise NetworkError(f"Connection failed after {max_retries} attempts")
time.sleep(2 ** attempt)
except requests.HTTPError as e:
if e.response.status_code >= 500:
# Server error, retry
if attempt == max_retries - 1:
raise SourceUnavailableError(f"Server error: {e}")
time.sleep(2 ** attempt)
else:
# Client error, don't retry
raise ArticleFetchError(f"HTTP {e.response.status_code}: {e}")
def process_articles(articles):
"""
Process articles with graceful failure handling
Strategy:
- Continue processing on individual failures
- Collect errors for reporting
- Save successful articles
- Report summary at end
"""
successful = []
failed = []
for article in articles:
try:
processed = process_article(article)
successful.append(processed)
except MediaDownloadError as e:
# Save article without media
logger.warning(f"Media download failed: {e}")
article.media_warning = str(e)
successful.append(article)
except ProcessingError as e:
# Skip this article, continue with others
logger.error(f"Failed to process article: {e}")
failed.append((article, e))
except Exception as e:
# Unexpected error, log and continue
logger.exception(f"Unexpected error: {e}")
failed.append((article, e))
# Report summary
logger.info(f"Processed {len(successful)}/{len(articles)} articles")
if failed:
logger.warning(f"Failed articles: {len(failed)}")
for article, error in failed:
logger.warning(f" - {article.title}: {error}")
return successful, failed
def fetch_articles_parallel(sources, count):
"""
Fetch articles from multiple sources concurrently
Performance gain: 5x faster than sequential
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=len(sources)) as executor:
# Submit all source fetching tasks
future_to_source = {
executor.submit(source.get_articles, count): source
for source in sources
}
# Collect results as they complete
results = {}
for future in as_completed(future_to_source):
source = future_to_source[future]
try:
articles = future.result(timeout=300) # 5 min timeout
results[source.id] = articles
except Exception as e:
logger.error(f"Source {source.id} failed: {e}")
results[source.id] = []
return results
# Without pooling
for url in urls:
response = requests.get(url) # New connection each time
# 100-500ms connection overhead per request
# With pooling (via Session)
session = requests.Session()
for url in urls:
response = session.get(url) # Reuse existing connection
# ~10ms overhead after first connection
class Article:
def __init__(self, url, title):
self.url = url
self.title = title
self._content = None # Lazy loaded
self._comments = None # Lazy loaded
@property
def content(self):
"""Lazy load content only when accessed"""
if self._content is None:
self._content = fetch_content(self.url)
return self._content
@property
def comments(self):
"""Lazy load comments only when accessed"""
if self._comments is None:
self._comments = fetch_comments(self.url)
return self._comments
from functools import lru_cache
@lru_cache(maxsize=1000)
def fetch_source_config(source_id):
"""
Cache source configurations
Avoids: Re-reading YAML files multiple times
"""
config_path = Path(f"sources/active/config_driven/configs/{source_id}.yaml")
with open(config_path) as f:
return yaml.safe_load(f)
# Test individual components in isolation
class TestSourceRegistry:
def test_discover_sources(self):
registry = SourceRegistry()
sources = registry.discover_sources()
assert len(sources) > 0
assert 'hn' in sources
assert 'bbc' in sources
def test_get_source(self):
registry = SourceRegistry()
registry.discover_sources()
source = registry.get_source('hn')
assert isinstance(source, BaseSource)
assert source.id == 'hn'
assert source.display_name == 'Hacker News'
# Test component interactions
class TestArticleFetching:
def test_fetch_and_process_article(self):
source = HackerNewsSource(config, session)
articles = source.get_articles(count=5)
assert len(articles) == 5
for article in articles:
# Test content fetching
assert article.content is not None
assert len(article.content) > 100
# Test media processing
assert article.images is not None
# Test output generation
output_path = generate_output(article)
assert output_path.exists()
assert (output_path / "article.md").exists()
# Test complete user workflows
def test_bundle_command():
"""Test: ./capcat bundle tech --count 10"""
result = subprocess.run(
["./capcat", "bundle", "tech", "--count", "10"],
capture_output=True,
text=True
)
assert result.returncode == 0
assert "articles saved" in result.stdout
# Verify output structure
news_dir = Path("../News")
assert news_dir.exists()
# Check created folders
date_str = datetime.now().strftime('%d-%m-%Y')
batch_dir = news_dir / f"news_{date_str}"
assert batch_dir.exists()
# Count articles
article_dirs = list(batch_dir.rglob("*/article.md"))
assert len(article_dirs) >= 10 # Should have ~30 (10 per source)
1. Command-line arguments (highest priority)
2. Environment variables
3. Config file (capcat.yml)
4. Default values (lowest priority)
# capcat.yml
# Default settings
default_count: 30
output_directory: "../News"
media_downloads: false # images only
html_generation: false
# Network settings
timeout: 30
max_retries: 3
user_agent: "Capcat/2.0"
# Source preferences
preferred_sources:
- hn
- bbc
- nature
# Bundle definitions (custom)
custom_bundles:
mytech:
sources: [hn, lb, iq, ieee]
description: "My tech bundle"
# Logging
log_level: INFO
log_file: "capcat.log"
# Privacy
anonymize_usernames: true
def get_config():
"""
Load configuration from multiple sources
Priority:
1. CLI args override everything
2. Environment variables override file
3. Config file overrides defaults
4. Built-in defaults
"""
# Start with defaults
config = {
'default_count': 30,
'output_directory': '../News',
'media_downloads': False,
'html_generation': False,
'timeout': 30,
}
# Load from config file if exists
config_file = Path('capcat.yml')
if config_file.exists():
with open(config_file) as f:
file_config = yaml.safe_load(f)
config.update(file_config)
# Override with environment variables
if 'CAPCAT_OUTPUT_DIR' in os.environ:
config['output_directory'] = os.environ['CAPCAT_OUTPUT_DIR']
if 'CAPCAT_DEFAULT_COUNT' in os.environ:
config['default_count'] = int(os.environ['CAPCAT_DEFAULT_COUNT'])
# CLI arguments applied by argparse in cli.py
return config
def validate_count(count):
"""Validate article count parameter"""
if not isinstance(count, int):
raise ValidationError("Count must be integer")
if count < 1:
raise ValidationError("Count must be positive")
if count > 1000:
raise ValidationError("Count limited to 1000 (performance)")
return count
def validate_url(url):
"""Validate single article URL"""
parsed = urllib.parse.urlparse(url)
if not parsed.scheme in ['http', 'https']:
raise ValidationError("URL must use http or https")
if not parsed.netloc:
raise ValidationError("Invalid URL format")
return url
def sanitize_path(user_input):
"""
Prevent path traversal attacks
Blocks: ../../../etc/passwd
"""
# Remove path traversal attempts
cleaned = user_input.replace('..', '')
# Remove absolute paths
cleaned = cleaned.lstrip('/')
# Remove special characters
cleaned = re.sub(r'[^\w\s-]', '', cleaned)
return cleaned
def anonymize_comment(comment_html):
"""
Replace usernames with 'Anonymous' for privacy
Preserves: Profile links (for attribution)
Removes: Identifying usernames in text
"""
soup = BeautifulSoup(comment_html, 'lxml')
# Find username elements
for username_elem in soup.find_all(class_='username'):
original_username = username_elem.get_text()
profile_url = username_elem.find('a')['href']
# Replace with anonymous
username_elem.string = 'Anonymous'
# Preserve profile link (for attribution)
username_elem['data-original-profile'] = profile_url
return str(soup)