Comprehensive testing procedures and standards for Capcat's hybrid architecture.
Capcat follows
# Activate environment
source venv/bin/activate
# Verify source discovery
python -c "from core.source_system.source_registry import get_source_registry; print(f'{len(get_source_registry().get_available_sources())} sources discovered')"
Test each source individually with diagnosis file creation:
# Test each source with output capture
./capcat fetch hn --count 10 2>&1 | tee test-diagnose-hn.txt
./capcat fetch lb --count 10 2>&1 | tee test-diagnose-lb.txt
./capcat fetch iq --count 10 2>&1 | tee test-diagnose-iq.txt
./capcat fetch bbc --count 10 2>&1 | tee test-diagnose-bbc.txt
./capcat fetch cnn --count 10 2>&1 | tee test-diagnose-cnn.txt
./capcat fetch techcrunch --count 10 2>&1 | tee test-diagnose-techcrunch.txt
./capcat fetch theverge --count 10 2>&1 | tee test-diagnose-theverge.txt
./capcat fetch wired --count 10 2>&1 | tee test-diagnose-wired.txt
./capcat fetch nature --count 10 2>&1 | tee test-diagnose-nature.txt
./capcat fetch scientificamerican --count 10 2>&1 | tee test-diagnose-scientificamerican.txt
./capcat fetch mittechreview --count 10 2>&1 | tee test-diagnose-mittechreview.txt
./capcat fetch gizmodo --count 10 2>&1 | tee test-diagnose-gizmodo.txt
./capcat fetch upi --count 10 2>&1 | tee test-diagnose-upi.txt
./capcat fetch tass --count 10 2>&1 | tee test-diagnose-tass.txt
./capcat fetch straitstimes --count 10 2>&1 | tee test-diagnose-straitstimes.txt
./capcat fetch euronews --count 10 2>&1 | tee test-diagnose-euronews.txt
./capcat fetch axios --count 10 2>&1 | tee test-diagnose-axios.txt
./capcat fetch yahoo --count 10 2>&1 | tee test-diagnose-yahoo.txt
./capcat fetch xinhua --count 10 2>&1 | tee test-diagnose-xinhua.txt
./capcat fetch test_source --count 10 2>&1 | tee test-diagnose-test_source.txt
After each test, verify media filtering works correctly:
# Count non-image media files (should be 0 without --media flag)
find "../News/news_*" -name "*.pdf" -o -name "*.mp4" -o -name "*.mp3" | wc -l
# Count image files (should be > 0)
find "../News/news_*" -name "*.jpg" -o -name "*.png" | wc -l
# test-diagnose-[SOURCE_NAME].md
**Date**: YYYY-MM-DD HH:MM
**Test Type**: SOURCE_TEST
**Command**: ./capcat fetch [source] --count 10
**Status**: [SUCCESS/FAILURE/PARTIAL/ERROR]
## Results Summary
- **Items Requested**: 10
- **Items Successfully Processed**: X/10
- **Success Rate**: X%
- **Media Files Downloaded**: X images
- **Non-Image Media Files**: 0 (should be 0 unless --media used)
## Core Functionality Verification
- +/- Primary function works without errors
- +/- Media filtering works correctly
- +/- Output format/structure is correct
- +/- Error handling works properly
- +/- Performance is acceptable
## Errors Encountered
[List any errors: import issues, network failures, logic errors, etc.]
## Output Structure/Results
[Directory structure and file contents]
## Recommendations
[Specific fixes needed: import statements, logic changes, performance improvements]
## Priority Level
[HIGH/MEDIUM/LOW] - Based on impact on core functionality
#!/usr/bin/env python3
"""
Comprehensive source testing script.
Tests all 20 sources with performance monitoring.
"""
import time
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from core.source_system.source_registry import get_source_registry
from core.source_system.performance_monitor import get_performance_monitor
class SourceTester:
def __init__(self):
self.registry = get_source_registry()
self.monitor = get_performance_monitor()
self.results = {}
def test_source(self, source_name: str, count: int = 10) -> dict:
"""Test a single source."""
print(f"Testing {source_name}...")
start_time = time.time()
try:
# Test source creation
source = self.registry.get_source(source_name)
creation_success = True
except Exception as e:
return {
'source_name': source_name,
'creation_success': False,
'creation_error': str(e),
'articles_discovered': 0,
'processing_time': 0
}
try:
# Test article discovery
articles = source.get_articles(count)
articles_discovered = len(articles)
discovery_success = articles_discovered > 0
except Exception as e:
return {
'source_name': source_name,
'creation_success': True,
'discovery_success': False,
'discovery_error': str(e),
'articles_discovered': 0,
'processing_time': time.time() - start_time
}
processing_time = time.time() - start_time
return {
'source_name': source_name,
'creation_success': creation_success,
'discovery_success': discovery_success,
'articles_discovered': articles_discovered,
'processing_time': processing_time,
'avg_time_per_article': processing_time / max(articles_discovered, 1)
}
def test_all_sources(self, count: int = 10) -> dict:
"""Test all available sources."""
sources = self.registry.get_available_sources()
print(f"Testing {len(sources)} sources...")
# Parallel testing
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(self.test_source, source, count): source
for source in sources
}
for future in futures:
source_name = futures[future]
try:
result = future.result()
self.results[source_name] = result
except Exception as e:
self.results[source_name] = {
'source_name': source_name,
'creation_success': False,
'error': str(e),
'processing_time': 0
}
return self.results
def generate_report(self) -> str:
"""Generate comprehensive test report."""
total_sources = len(self.results)
successful_creation = sum(1 for r in self.results.values() if r.get('creation_success', False))
successful_discovery = sum(1 for r in self.results.values() if r.get('discovery_success', False))
report = f"""# Comprehensive Source Test Report
**Date**: {time.strftime('%Y-%m-%d %H:%M:%S')}
**Sources Tested**: {total_sources}
**Creation Success Rate**: {successful_creation}/{total_sources} ({successful_creation/total_sources*100:.1f}%)
**Discovery Success Rate**: {successful_discovery}/{total_sources} ({successful_discovery/total_sources*100:.1f}%)
## Individual Source Results
"""
for source_name, result in sorted(self.results.items()):
status = "SUCCESS" if result.get('discovery_success', False) else "FAILED"
articles = result.get('articles_discovered', 0)
time_taken = result.get('processing_time', 0)
report += f"### {source_name}\n"
report += f"- **Status**: {status}\n"
report += f"- **Articles Discovered**: {articles}\n"
report += f"- **Processing Time**: {time_taken:.2f}s\n"
if 'error' in result or 'creation_error' in result or 'discovery_error' in result:
error = result.get('error') or result.get('creation_error') or result.get('discovery_error')
report += f"- **Error**: {error}\n"
report += "\n"
# Performance summary
avg_time = sum(r.get('processing_time', 0) for r in self.results.values()) / total_sources
total_articles = sum(r.get('articles_discovered', 0) for r in self.results.values())
report += f"""## Performance Summary
- **Average Processing Time**: {avg_time:.2f}s per source
- **Total Articles Discovered**: {total_articles}
- **Overall Success Rate**: {successful_discovery/total_sources*100:.1f}%
## Recommendations
"""
# Add recommendations based on results
failed_sources = [name for name, result in self.results.items()
if not result.get('discovery_success', False)]
if failed_sources:
report += f"**Failed Sources**: {', '.join(failed_sources)}\n"
report += "- Review configuration and network connectivity\n"
report += "- Check for anti-bot protection\n"
report += "- Validate CSS selectors\n\n"
slow_sources = [name for name, result in self.results.items()
if result.get('processing_time', 0) > 10]
if slow_sources:
report += f"**Slow Sources**: {', '.join(slow_sources)}\n"
report += "- Consider optimization\n"
report += "- Review timeout settings\n"
return report
if __name__ == "__main__":
tester = SourceTester()
results = tester.test_all_sources(count=10)
report = tester.generate_report()
# Save report
with open('comprehensive_test_report.md', 'w') as f:
f.write(report)
print("Test completed. Report saved to comprehensive_test_report.md")
print(f"Success rate: {sum(1 for r in results.values() if r.get('discovery_success', False))}/{len(results)}")
# test_registry.py
import unittest
from core.source_system.source_registry import SourceRegistry, get_source_registry
class TestSourceRegistry(unittest.TestCase):
def setUp(self):
self.registry = SourceRegistry()
def test_source_discovery(self):
"""Test source discovery functionality."""
sources = self.registry.discover_sources()
self.assertGreater(len(sources), 0)
self.assertIn('hn', sources)
self.assertIn('bbc', sources)
def test_source_creation(self):
"""Test source instance creation."""
source = self.registry.get_source('hn')
self.assertIsNotNone(source)
self.assertEqual(source.config.name, 'hn')
def test_validation(self):
"""Test source validation."""
errors = self.registry.validate_all_sources()
# Should be minimal errors in production system
error_count = sum(len(errs) for errs in errors.values())
self.assertLessEqual(error_count, 5) # Allow some minor issues
if __name__ == '__main__':
unittest.main()
# test_performance_monitor.py
import unittest
import time
from core.source_system.performance_monitor import PerformanceMonitor
class TestPerformanceMonitor(unittest.TestCase):
def setUp(self):
self.monitor = PerformanceMonitor()
def test_metrics_recording(self):
"""Test metrics recording."""
self.monitor.record_request('test_source', True, 1.5)
self.monitor.record_request('test_source', False, 2.0)
metrics = self.monitor.get_source_metrics('test_source')
self.assertEqual(metrics.total_requests, 2)
self.assertEqual(metrics.successful_requests, 1)
self.assertEqual(metrics.success_rate, 50.0)
def test_performance_report(self):
"""Test report generation."""
self.monitor.record_request('test', True, 1.0)
report = self.monitor.generate_performance_report()
self.assertIn('test', report)
self.assertIn('100.0%', report)
if __name__ == '__main__':
unittest.main()
# test_e2e_workflow.py
import unittest
import tempfile
import shutil
from pathlib import Path
from core.source_system.source_registry import get_source_registry
class TestE2EWorkflow(unittest.TestCase):
def setUp(self):
self.test_dir = Path(tempfile.mkdtemp())
self.registry = get_source_registry()
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_article_fetch_workflow(self):
"""Test complete article fetching workflow."""
# Get source
source = self.registry.get_source('hn')
# Fetch articles
articles = source.get_articles(count=3)
self.assertGreater(len(articles), 0)
# Fetch content for first article
if articles:
content = source.get_article_content(articles[0]['url'])
self.assertIsNotNone(content)
def test_config_driven_vs_custom(self):
"""Test both source types work."""
# Config-driven source
config_source = self.registry.get_source('iq')
config_articles = config_source.get_articles(count=3)
# Custom source
custom_source = self.registry.get_source('hn')
custom_articles = custom_source.get_articles(count=3)
self.assertGreater(len(config_articles), 0)
self.assertGreater(len(custom_articles), 0)
if __name__ == '__main__':
unittest.main()
# benchmark_performance.py
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
from core.source_system.source_registry import get_source_registry
class PerformanceBenchmark:
def __init__(self):
self.registry = get_source_registry()
def benchmark_source(self, source_name: str, iterations: int = 5) -> dict:
"""Benchmark a single source."""
times = []
article_counts = []
for _ in range(iterations):
start_time = time.time()
source = self.registry.get_source(source_name)
articles = source.get_articles(count=10)
duration = time.time() - start_time
times.append(duration)
article_counts.append(len(articles))
return {
'source_name': source_name,
'avg_time': statistics.mean(times),
'min_time': min(times),
'max_time': max(times),
'std_dev': statistics.stdev(times) if len(times) > 1 else 0,
'avg_articles': statistics.mean(article_counts),
'consistency': statistics.stdev(article_counts) if len(article_counts) > 1 else 0
}
def benchmark_all_sources(self, iterations: int = 3) -> dict:
"""Benchmark all sources."""
sources = self.registry.get_available_sources()[:5] # Test subset for speed
results = {}
for source in sources:
try:
results[source] = self.benchmark_source(source, iterations)
except Exception as e:
results[source] = {'error': str(e)}
return results
def generate_performance_report(self, results: dict) -> str:
"""Generate performance report."""
report = "# Performance Benchmark Report\n\n"
successful_results = {k: v for k, v in results.items() if 'error' not in v}
if successful_results:
avg_times = [r['avg_time'] for r in successful_results.values()]
overall_avg = statistics.mean(avg_times)
report += f"**Overall Average Time**: {overall_avg:.2f}s\n"
report += f"**Sources Tested**: {len(successful_results)}\n\n"
report += "## Individual Source Performance\n\n"
for source, data in sorted(successful_results.items(), key=lambda x: x[1]['avg_time']):
report += f"### {source}\n"
report += f"- Average Time: {data['avg_time']:.2f}s\n"
report += f"- Range: {data['min_time']:.2f}s - {data['max_time']:.2f}s\n"
report += f"- Articles Found: {data['avg_articles']:.1f} avg\n"
report += f"- Consistency: {data['std_dev']:.2f}s std dev\n\n"
# Failed sources
failed_sources = {k: v for k, v in results.items() if 'error' in v}
if failed_sources:
report += "## Failed Sources\n\n"
for source, data in failed_sources.items():
report += f"- **{source}**: {data['error']}\n"
return report
if __name__ == "__main__":
benchmark = PerformanceBenchmark()
results = benchmark.benchmark_all_sources(iterations=3)
report = benchmark.generate_performance_report(results)
with open('performance_benchmark.md', 'w') as f:
f.write(report)
print("Performance benchmark completed. Report saved to performance_benchmark.md")
import logging
logging.basicConfig(level=logging.DEBUG)
# Check Python path
python -c "import sys; print('\n'.join(sys.path))"
# Verify virtual environment
which python
pip list | grep -E "(requests|beautifulsoup4|yaml)"
# Test network connectivity
import requests
response = requests.get('https://httpbin.org/get', timeout=5)
print(f"Status: {response.status_code}")
# Debug source discovery
from core.source_system.source_registry import SourceRegistry
registry = SourceRegistry()
registry.logger.setLevel(logging.DEBUG)
sources = registry.discover_sources()
# Validate test output structure
import os
from pathlib import Path
def validate_test_output(base_path="../News"):
"""Validate test output structure."""
news_dirs = list(Path(base_path).glob("news_*"))
if not news_dirs:
print("No news directories found")
return False
latest_dir = max(news_dirs, key=os.path.getctime)
print(f"Found news directory: {latest_dir}")
source_dirs = list(latest_dir.glob("*_*"))
print(f"Source directories: {len(source_dirs)}")
for source_dir in source_dirs:
article_dirs = list(source_dir.glob("*_*"))
print(f" - {source_dir.name}: {len(article_dirs)} articles")
if article_dirs:
sample_article = article_dirs[0]
article_md = sample_article / "article.md"
if article_md.exists():
print(f" {sample_article.name}/article.md exists")
else:
print(f" {sample_article.name}/article.md missing")
if __name__ == "__main__":
validate_test_output()
This testing guide ensures comprehensive validation of all Capcat components with systematic procedures and clear success criteria.