Introduction to Subdomain Discovery Automation
Manual subdomain discovery is time-consuming and error-prone. As organizations grow and cloud infrastructure becomes more complex, the attack surface expands exponentially. Automation is essential for security teams to keep pace with modern infrastructure and maintain comprehensive visibility into their digital assets.
This guide demonstrates how to build sophisticated automation pipelines that combine multiple data sources, handle rate limiting, integrate with existing security workflows, and scale to handle enterprise-level reconnaissance requirements.
Architecture Design Principles
Effective subdomain discovery automation follows key architectural principles:
Modular Design
- Separation of concerns: Data collection, processing, and output
- Pluggable components: Easy to add new data sources
- Error isolation: Failures in one module don't crash the pipeline
- Independent scaling: Scale different components based on load
Data Source Integration
- Multiple passive sources: Certificate Transparency, DNS databases
- Active enumeration: DNS brute forcing and zone transfers
- API aggregation: Combine commercial and free services
- Historical data: Track changes over time
Scalability and Performance
- Concurrent processing: Parallel data collection
- Intelligent rate limiting: Respect API limits and avoid blocking
- Caching mechanisms: Avoid redundant API calls
- Resource optimization: Efficient memory and CPU usage
Building a Python-Based Discovery Framework
Python provides excellent libraries for building robust automation frameworks:
Core Framework Structure
#!/usr/bin/env python3
"""
Comprehensive Subdomain Discovery Framework
Combines multiple data sources with intelligent processing
"""
import asyncio
import aiohttp
import dns.resolver
import json
import time
import logging
from dataclasses import dataclass
from typing import List, Set, Dict, Optional
from urllib.parse import urljoin
import concurrent.futures
from pathlib import Path
@dataclass
class SubdomainResult:
"""Represents a discovered subdomain with metadata"""
subdomain: str
source: str
timestamp: float
ip_addresses: List[str] = None
http_status: Optional[int] = None
technologies: List[str] = None
class SubdomainDiscoveryFramework:
"""Main framework for automated subdomain discovery"""
def __init__(self, config_file: str = "config.json"):
self.config = self.load_config(config_file)
self.session = None
self.results: Set[str] = set()
self.detailed_results: List[SubdomainResult] = []
self.setup_logging()
def load_config(self, config_file: str) -> Dict:
"""Load configuration from JSON file"""
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return self.default_config()
def default_config(self) -> Dict:
"""Default configuration"""
return {
"sources": {
"certificate_transparency": True,
"dns_brute_force": True,
"passive_dns": True,
"search_engines": False
},
"rate_limits": {
"ct_logs": 10, # requests per second
"dns_queries": 100,
"http_requests": 50
},
"timeouts": {
"dns": 5,
"http": 10,
"total": 300
},
"wordlists": {
"common": "wordlists/common.txt",
"comprehensive": "wordlists/comprehensive.txt"
}
}
def setup_logging(self):
"""Configure logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('subdomain_discovery.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
async def discover_subdomains(self, domain: str) -> List[SubdomainResult]:
"""Main discovery method coordinating all sources"""
self.logger.info(f"Starting subdomain discovery for {domain}")
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.config["timeouts"]["total"])
) as session:
self.session = session
# Collect tasks for parallel execution
tasks = []
if self.config["sources"]["certificate_transparency"]:
tasks.append(self.discover_from_ct_logs(domain))
if self.config["sources"]["dns_brute_force"]:
tasks.append(self.discover_via_dns_brute_force(domain))
if self.config["sources"]["passive_dns"]:
tasks.append(self.discover_from_passive_dns(domain))
# Execute all discovery methods concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results and handle exceptions
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.error(f"Task {i} failed: {result}")
else:
self.detailed_results.extend(result)
# Post-process results
await self.validate_and_enrich_results()
return self.detailed_results
async def discover_from_ct_logs(self, domain: str) -> List[SubdomainResult]:
"""Discover subdomains from Certificate Transparency logs"""
self.logger.info(f"Querying Certificate Transparency logs for {domain}")
ct_sources = [
f"https://crt.sh/?q=%25.{domain}&output=json",
f"https://api.certspotter.com/v1/issuances?domain={domain}&include_subdomains=true&expand=dns_names"
]
results = []
rate_limiter = asyncio.Semaphore(self.config["rate_limits"]["ct_logs"])
async def query_ct_source(url: str) -> List[SubdomainResult]:
async with rate_limiter:
try:
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
return self.parse_ct_response(data, domain)
except Exception as e:
self.logger.error(f"CT query failed for {url}: {e}")
return []
# Query all CT sources concurrently
tasks = [query_ct_source(url) for url in ct_sources]
ct_results = await asyncio.gather(*tasks, return_exceptions=True)
for result in ct_results:
if not isinstance(result, Exception):
results.extend(result)
self.logger.info(f"Found {len(results)} subdomains from CT logs")
return results
def parse_ct_response(self, data: List[Dict], domain: str) -> List[SubdomainResult]:
"""Parse Certificate Transparency response data"""
results = []
seen = set()
for entry in data:
# Handle different CT log response formats
if 'name_value' in entry:
names = entry['name_value'].split('\n')
elif 'dns_names' in entry:
names = entry['dns_names']
else:
continue
for name in names:
name = name.strip().lower()
if name.startswith('*.'):
name = name[2:]
if (name.endswith(f'.{domain}') or name == domain) and name not in seen:
seen.add(name)
results.append(SubdomainResult(
subdomain=name,
source="Certificate Transparency",
timestamp=time.time()
))
return results
async def discover_via_dns_brute_force(self, domain: str) -> List[SubdomainResult]:
"""Discover subdomains via DNS brute force"""
self.logger.info(f"Starting DNS brute force for {domain}")
wordlist_path = self.config["wordlists"]["common"]
if not Path(wordlist_path).exists():
self.logger.warning(f"Wordlist not found: {wordlist_path}")
return []
with open(wordlist_path, 'r') as f:
words = [line.strip() for line in f if line.strip()]
# Concurrent DNS resolution with rate limiting
semaphore = asyncio.Semaphore(self.config["rate_limits"]["dns_queries"])
results = []
async def resolve_subdomain(word: str) -> Optional[SubdomainResult]:
async with semaphore:
subdomain = f"{word}.{domain}"
try:
# Use asyncio-compatible DNS resolution
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
resolver = dns.resolver.Resolver()
resolver.timeout = self.config["timeouts"]["dns"]
answers = await loop.run_in_executor(
executor, resolver.resolve, subdomain, 'A'
)
ip_addresses = [str(rdata) for rdata in answers]
return SubdomainResult(
subdomain=subdomain,
source="DNS Brute Force",
timestamp=time.time(),
ip_addresses=ip_addresses
)
except Exception:
return None
# Execute DNS queries concurrently
tasks = [resolve_subdomain(word) for word in words]
dns_results = await asyncio.gather(*tasks, return_exceptions=True)
results = [r for r in dns_results if isinstance(r, SubdomainResult)]
self.logger.info(f"Found {len(results)} subdomains via DNS brute force")
return results
async def discover_from_passive_dns(self, domain: str) -> List[SubdomainResult]:
"""Discover subdomains from passive DNS sources"""
self.logger.info(f"Querying passive DNS sources for {domain}")
# Example passive DNS sources (configure with API keys)
sources = {
"virustotal": f"https://www.virustotal.com/vtapi/v2/domain/report?apikey={self.config.get('api_keys', {}).get('virustotal', '')}&domain={domain}",
"securitytrails": f"https://api.securitytrails.com/v1/domain/{domain}/subdomains"
}
results = []
for source_name, url in sources.items():
try:
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
source_results = self.parse_passive_dns_response(
data, domain, source_name
)
results.extend(source_results)
except Exception as e:
self.logger.error(f"Passive DNS query failed for {source_name}: {e}")
return results
def parse_passive_dns_response(self, data: Dict, domain: str, source: str) -> List[SubdomainResult]:
"""Parse passive DNS response data"""
results = []
# Handle different response formats
if source == "virustotal" and "subdomains" in data:
for subdomain in data["subdomains"]:
full_domain = f"{subdomain}.{domain}"
results.append(SubdomainResult(
subdomain=full_domain,
source=f"Passive DNS ({source})",
timestamp=time.time()
))
return results
async def validate_and_enrich_results(self):
"""Validate discovered subdomains and enrich with additional data"""
self.logger.info("Validating and enriching results")
# Remove duplicates while preserving metadata
unique_subdomains = {}
for result in self.detailed_results:
if result.subdomain not in unique_subdomains:
unique_subdomains[result.subdomain] = result
else:
# Merge sources
existing = unique_subdomains[result.subdomain]
existing.source += f", {result.source}"
self.detailed_results = list(unique_subdomains.values())
# Enrich with HTTP status codes
semaphore = asyncio.Semaphore(self.config["rate_limits"]["http_requests"])
async def check_http_status(result: SubdomainResult):
async with semaphore:
try:
async with self.session.get(
f"https://{result.subdomain}",
timeout=aiohttp.ClientTimeout(total=self.config["timeouts"]["http"])
) as response:
result.http_status = response.status
except Exception:
try:
async with self.session.get(
f"http://{result.subdomain}",
timeout=aiohttp.ClientTimeout(total=self.config["timeouts"]["http"])
) as response:
result.http_status = response.status
except Exception:
result.http_status = None
# Check HTTP status for all results
tasks = [check_http_status(result) for result in self.detailed_results]
await asyncio.gather(*tasks, return_exceptions=True)
def export_results(self, format: str = "json", filename: str = None) -> str:
"""Export results in various formats"""
if not filename:
timestamp = int(time.time())
filename = f"subdomain_results_{timestamp}.{format}"
if format == "json":
data = {
"timestamp": time.time(),
"total_subdomains": len(self.detailed_results),
"results": [
{
"subdomain": r.subdomain,
"source": r.source,
"timestamp": r.timestamp,
"ip_addresses": r.ip_addresses,
"http_status": r.http_status
}
for r in self.detailed_results
]
}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
elif format == "csv":
import csv
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Subdomain', 'Source', 'IP Addresses', 'HTTP Status'])
for result in self.detailed_results:
writer.writerow([
result.subdomain,
result.source,
','.join(result.ip_addresses or []),
result.http_status
])
return filename
# Usage example
async def main():
framework = SubdomainDiscoveryFramework()
results = await framework.discover_subdomains("example.com")
print(f"Discovered {len(results)} unique subdomains")
output_file = framework.export_results("json")
print(f"Results exported to {output_file}")
if __name__ == "__main__":
asyncio.run(main())
Configuration Management
Create flexible configuration files to customize behavior:
{
"sources": {
"certificate_transparency": true,
"dns_brute_force": true,
"passive_dns": true,
"search_engines": false,
"github_search": true
},
"api_keys": {
"virustotal": "your_vt_api_key",
"securitytrails": "your_st_api_key",
"shodan": "your_shodan_api_key"
},
"rate_limits": {
"ct_logs": 10,
"dns_queries": 100,
"http_requests": 50,
"api_calls": 5
},
"timeouts": {
"dns": 5,
"http": 10,
"api": 30,
"total": 600
},
"wordlists": {
"common": "wordlists/common_subdomains.txt",
"comprehensive": "wordlists/all_subdomains.txt",
"custom": "wordlists/company_specific.txt"
},
"output": {
"formats": ["json", "csv", "xml"],
"directory": "results",
"include_metadata": true
},
"notifications": {
"slack_webhook": "https://hooks.slack.com/...",
"email": "security@company.com",
"telegram_bot_token": "your_bot_token"
}
}
Go-Based High-Performance Scanner
Go excels at building high-performance, concurrent applications:
Core Scanner Implementation
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
)
type SubdomainResult struct {
Subdomain string `json:"subdomain"`
IPAddresses []string `json:"ip_addresses"`
Source string `json:"source"`
HTTPStatus int `json:"http_status"`
Timestamp time.Time `json:"timestamp"`
}
type Scanner struct {
domain string
wordlist []string
results []SubdomainResult
resultsMu sync.Mutex
concurrency int
timeout time.Duration
httpClient *http.Client
resolver *net.Resolver
}
func NewScanner(domain string, concurrency int) *Scanner {
return &Scanner{
domain: domain,
concurrency: concurrency,
timeout: 5 * time.Second,
httpClient: &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
},
},
resolver: &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
d := net.Dialer{
Timeout: 2 * time.Second,
}
return d.DialContext(ctx, network, address)
},
},
}
}
func (s *Scanner) LoadWordlist(filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
word := strings.TrimSpace(scanner.Text())
if word != "" && !strings.HasPrefix(word, "#") {
s.wordlist = append(s.wordlist, word)
}
}
return scanner.Err()
}
func (s *Scanner) ScanDNS() {
log.Printf("Starting DNS scan for %s with %d workers", s.domain, s.concurrency)
jobs := make(chan string, len(s.wordlist))
var wg sync.WaitGroup
// Start workers
for i := 0; i < s.concurrency; i++ {
wg.Add(1)
go s.dnsWorker(jobs, &wg)
}
// Send jobs
for _, word := range s.wordlist {
jobs <- word
}
close(jobs)
wg.Wait()
log.Printf("DNS scan completed. Found %d subdomains", len(s.results))
}
func (s *Scanner) dnsWorker(jobs <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for word := range jobs {
subdomain := fmt.Sprintf("%s.%s", word, s.domain)
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
ips, err := s.resolver.LookupHost(ctx, subdomain)
cancel()
if err == nil && len(ips) > 0 {
result := SubdomainResult{
Subdomain: subdomain,
IPAddresses: ips,
Source: "DNS Brute Force",
Timestamp: time.Now(),
}
s.resultsMu.Lock()
s.results = append(s.results, result)
s.resultsMu.Unlock()
log.Printf("Found: %s -> %v", subdomain, ips)
}
}
}
func (s *Scanner) EnrichWithHTTPStatus() {
log.Printf("Enriching %d subdomains with HTTP status", len(s.results))
jobs := make(chan *SubdomainResult, len(s.results))
var wg sync.WaitGroup
// Start workers
for i := 0; i < s.concurrency; i++ {
wg.Add(1)
go s.httpWorker(jobs, &wg)
}
// Send jobs
for i := range s.results {
jobs <- &s.results[i]
}
close(jobs)
wg.Wait()
log.Printf("HTTP enrichment completed")
}
func (s *Scanner) httpWorker(jobs <-chan *SubdomainResult, wg *sync.WaitGroup) {
defer wg.Done()
for result := range jobs {
// Try HTTPS first, then HTTP
urls := []string{
fmt.Sprintf("https://%s", result.Subdomain),
fmt.Sprintf("http://%s", result.Subdomain),
}
for _, url := range urls {
resp, err := s.httpClient.Get(url)
if err == nil {
result.HTTPStatus = resp.StatusCode
resp.Body.Close()
break
}
}
}
}
func (s *Scanner) DiscoverFromCT() error {
log.Printf("Querying Certificate Transparency logs for %s", s.domain)
url := fmt.Sprintf("https://crt.sh/?q=%%25.%s&output=json", s.domain)
resp, err := s.httpClient.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
var ctEntries []struct {
NameValue string `json:"name_value"`
}
if err := json.NewDecoder(resp.Body).Decode(&ctEntries); err != nil {
return err
}
seen := make(map[string]bool)
for _, entry := range ctEntries {
names := strings.Split(entry.NameValue, "\n")
for _, name := range names {
name = strings.TrimSpace(strings.ToLower(name))
if strings.HasPrefix(name, "*.") {
name = name[2:]
}
if (strings.HasSuffix(name, "."+s.domain) || name == s.domain) && !seen[name] {
seen[name] = true
result := SubdomainResult{
Subdomain: name,
Source: "Certificate Transparency",
Timestamp: time.Now(),
}
s.resultsMu.Lock()
s.results = append(s.results, result)
s.resultsMu.Unlock()
}
}
}
log.Printf("Found %d subdomains from CT logs", len(seen))
return nil
}
func (s *Scanner) ExportResults(filename string) error {
data := struct {
Domain string `json:"domain"`
Timestamp time.Time `json:"timestamp"`
Count int `json:"count"`
Results []SubdomainResult `json:"results"`
}{
Domain: s.domain,
Timestamp: time.Now(),
Count: len(s.results),
Results: s.results,
}
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
return encoder.Encode(data)
}
func main() {
if len(os.Args) < 3 {
log.Fatal("Usage: scanner <domain> <wordlist>")
}
domain := os.Args[1]
wordlistFile := os.Args[2]
scanner := NewScanner(domain, 50) // 50 concurrent workers
// Load wordlist
if err := scanner.LoadWordlist(wordlistFile); err != nil {
log.Fatalf("Failed to load wordlist: %v", err)
}
// Discover from Certificate Transparency
if err := scanner.DiscoverFromCT(); err != nil {
log.Printf("CT discovery failed: %v", err)
}
// DNS brute force
scanner.ScanDNS()
// Enrich with HTTP status
scanner.EnrichWithHTTPStatus()
// Export results
outputFile := fmt.Sprintf("%s_results_%d.json", domain, time.Now().Unix())
if err := scanner.ExportResults(outputFile); err != nil {
log.Fatalf("Failed to export results: %v", err)
}
log.Printf("Results exported to %s", outputFile)
}
Bash-Based Pipeline Orchestration
Bash excels at orchestrating multiple tools and creating flexible pipelines:
Master Pipeline Script
#!/bin/bash
# Comprehensive Subdomain Discovery Pipeline
# Orchestrates multiple tools and data sources
set -euo pipefail
# Configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="${SCRIPT_DIR}/config.conf"
TOOLS_DIR="${SCRIPT_DIR}/tools"
WORDLISTS_DIR="${SCRIPT_DIR}/wordlists"
RESULTS_DIR="${SCRIPT_DIR}/results"
# Load configuration
source "$CONFIG_FILE" 2>/dev/null || {
echo "Warning: Config file not found, using defaults"
CONCURRENCY=50
TIMEOUT=30
HTTP_TIMEOUT=10
}
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Logging functions
log_info() {
echo -e "${BLUE}[INFO]${NC} $(date '+%H:%M:%S') $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $(date '+%H:%M:%S') $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $(date '+%H:%M:%S') $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $(date '+%H:%M:%S') $1"
}
# Utility functions
check_dependencies() {
local deps=("curl" "dig" "jq" "subfinder" "amass" "httpx")
local missing=()
for dep in "${deps[@]}"; do
if ! command -v "$dep" &> /dev/null; then
missing+=("$dep")
fi
done
if [ ${#missing[@]} -ne 0 ]; then
log_error "Missing dependencies: ${missing[*]}"
log_info "Install missing tools and try again"
exit 1
fi
}
create_directories() {
mkdir -p "$RESULTS_DIR"/{raw,processed,reports}
mkdir -p "$RESULTS_DIR/raw"/{ct,dns,passive,active}
}
setup_session() {
local domain=$1
local timestamp=$(date +%Y%m%d_%H%M%S)
export SESSION_DIR="$RESULTS_DIR/${domain}_${timestamp}"
export RAW_DIR="$SESSION_DIR/raw"
export PROCESSED_DIR="$SESSION_DIR/processed"
mkdir -p "$RAW_DIR"/{ct,dns,passive,active}
mkdir -p "$PROCESSED_DIR"
log_info "Session directory: $SESSION_DIR"
}
# Data collection functions
collect_ct_data() {
local domain=$1
log_info "Collecting Certificate Transparency data for $domain"
# crt.sh
log_info "Querying crt.sh..."
curl -s "https://crt.sh/?q=%25.$domain&output=json" | \
jq -r '.[].name_value' | \
tr ',' '\n' | \
sort -u > "$RAW_DIR/ct/crtsh.txt"
# Certspotter
log_info "Querying Certspotter..."
curl -s "https://api.certspotter.com/v1/issuances?domain=$domain&include_subdomains=true&expand=dns_names" | \
jq -r '.[].dns_names[]?' | \
sort -u > "$RAW_DIR/ct/certspotter.txt"
# Facebook CT API
log_info "Querying Facebook CT API..."
curl -s "https://graph.facebook.com/certificates?query=$domain&limit=10000&access_token=\${FB_ACCESS_TOKEN}" | \
jq -r '.data[].domains[]?' | \
sort -u > "$RAW_DIR/ct/facebook.txt" 2>/dev/null || log_warning "Facebook CT query failed"
# Combine CT results
cat "$RAW_DIR/ct/"*.txt | \
grep -E "\.$domain$|^$domain$" | \
sed 's/^\*\.//g' | \
sort -u > "$RAW_DIR/ct/all_ct.txt"
local ct_count=$(wc -l < "$RAW_DIR/ct/all_ct.txt")
log_success "Collected $ct_count subdomains from CT logs"
}
collect_passive_data() {
local domain=$1
log_info "Collecting passive reconnaissance data for $domain"
# Subfinder
log_info "Running Subfinder..."
subfinder -d "$domain" -all -recursive -silent -o "$RAW_DIR/passive/subfinder.txt"
# Amass passive
log_info "Running Amass passive enumeration..."
amass enum -passive -d "$domain" -config "$SCRIPT_DIR/amass_config.yaml" -o "$RAW_DIR/passive/amass.txt"
# AssetFinder
log_info "Running AssetFinder..."
echo "$domain" | assetfinder --subs-only > "$RAW_DIR/passive/assetfinder.txt"
# Findomain
log_info "Running Findomain..."
findomain -t "$domain" -u "$RAW_DIR/passive/findomain.txt" 2>/dev/null || log_warning "Findomain failed"
# Combine passive results
cat "$RAW_DIR/passive/"*.txt | \
sort -u > "$RAW_DIR/passive/all_passive.txt"
local passive_count=$(wc -l < "$RAW_DIR/passive/all_passive.txt")
log_success "Collected $passive_count subdomains from passive sources"
}
collect_active_data() {
local domain=$1
log_info "Performing active DNS enumeration for $domain"
# DNS brute force with multiple wordlists
local wordlists=("$WORDLISTS_DIR/common.txt" "$WORDLISTS_DIR/comprehensive.txt")
for wordlist in "${wordlists[@]}"; do
if [ -f "$wordlist" ]; then
local wordlist_name=$(basename "$wordlist" .txt)
log_info "DNS brute force with $wordlist_name wordlist..."
# Using massdns for high-speed resolution
if command -v massdns &> /dev/null; then
sed "s/$/.$domain/" "$wordlist" | \
massdns -r /etc/resolv.conf -t A -o S -w "$RAW_DIR/active/massdns_${wordlist_name}.txt"
grep -E "^[^;].*\sA\s" "$RAW_DIR/active/massdns_${wordlist_name}.txt" | \
cut -d' ' -f1 | \
sed 's/\.$//' > "$RAW_DIR/active/resolved_${wordlist_name}.txt"
else
# Fallback to custom DNS resolution
parallel -j "$CONCURRENCY" --timeout "$TIMEOUT" \
"dig +short {1}.$domain A | grep -E '^[0-9]+\.' && echo {1}.$domain" \
:::: "$wordlist" > "$RAW_DIR/active/resolved_${wordlist_name}.txt" 2>/dev/null
fi
fi
done
# Zone transfer attempts
log_info "Attempting zone transfers..."
for ns in $(dig +short NS "$domain"); do
log_info "Trying zone transfer from $ns..."
dig @"$ns" AXFR "$domain" | \
grep -E "^[^;].*\sA\s" | \
awk '{print $1}' | \
sed 's/\.$//' >> "$RAW_DIR/active/zone_transfer.txt" 2>/dev/null || true
done
# Combine active results
cat "$RAW_DIR/active/"*.txt | \
sort -u > "$RAW_DIR/active/all_active.txt"
local active_count=$(wc -l < "$RAW_DIR/active/all_active.txt")
log_success "Collected $active_count subdomains from active enumeration"
}
generate_permutations() {
local domain=$1
log_info "Generating subdomain permutations for $domain"
# Collect all discovered subdomains
cat "$RAW_DIR"/*/*.txt | sort -u > "$PROCESSED_DIR/all_discovered.txt"
# Generate permutations using altdns
if command -v altdns &> /dev/null; then
log_info "Generating permutations with altdns..."
altdns -i "$PROCESSED_DIR/all_discovered.txt" \
-o "$PROCESSED_DIR/permutations.txt" \
-w "$WORDLISTS_DIR/permutation_words.txt" \
-r -s /dev/null
fi
# Custom permutation logic
log_info "Generating custom permutations..."
while read -r subdomain; do
local base=$(echo "$subdomain" | sed "s/\.$domain$//")
# Generate variations
echo "${base}-dev.$domain"
echo "${base}-staging.$domain"
echo "${base}-test.$domain"
echo "${base}01.$domain"
echo "dev-${base}.$domain"
echo "staging-${base}.$domain"
echo "test-${base}.$domain"
done < "$PROCESSED_DIR/all_discovered.txt" >> "$PROCESSED_DIR/custom_permutations.txt"
# Combine all permutations
cat "$PROCESSED_DIR/permutations.txt" "$PROCESSED_DIR/custom_permutations.txt" | \
sort -u > "$PROCESSED_DIR/all_permutations.txt"
local perm_count=$(wc -l < "$PROCESSED_DIR/all_permutations.txt")
log_success "Generated $perm_count permutations"
}
validate_subdomains() {
local domain=$1
log_info "Validating discovered subdomains for $domain"
# Combine all sources
cat "$RAW_DIR"/*/*.txt "$PROCESSED_DIR/all_permutations.txt" | \
sort -u > "$PROCESSED_DIR/all_candidates.txt"
# DNS validation
log_info "Validating DNS resolution..."
if command -v massdns &> /dev/null; then
massdns -r /etc/resolv.conf -t A -o S -w "$PROCESSED_DIR/dns_validation.txt" "$PROCESSED_DIR/all_candidates.txt"
grep -E "^[^;].*\sA\s" "$PROCESSED_DIR/dns_validation.txt" | \
cut -d' ' -f1 | \
sed 's/\.$//' > "$PROCESSED_DIR/valid_subdomains.txt"
else
# Parallel DNS validation
parallel -j "$CONCURRENCY" --timeout "$TIMEOUT" \
"dig +short {1} A | grep -E '^[0-9]+\.' > /dev/null && echo {1}" \
:::: "$PROCESSED_DIR/all_candidates.txt" > "$PROCESSED_DIR/valid_subdomains.txt" 2>/dev/null
fi
# HTTP validation and enrichment
log_info "Checking HTTP status codes..."
httpx -l "$PROCESSED_DIR/valid_subdomains.txt" \
-silent \
-status-code \
-title \
-tech-detect \
-timeout "$HTTP_TIMEOUT" \
-threads "$CONCURRENCY" \
-o "$PROCESSED_DIR/http_results.txt"
# Extract live subdomains
grep -E "^https?://" "$PROCESSED_DIR/http_results.txt" | \
sed -E 's|^https?://([^/]+).*|\1|' | \
sort -u > "$PROCESSED_DIR/live_subdomains.txt"
local valid_count=$(wc -l < "$PROCESSED_DIR/valid_subdomains.txt")
local live_count=$(wc -l < "$PROCESSED_DIR/live_subdomains.txt")
log_success "Found $valid_count valid subdomains ($live_count with HTTP services)"
}
generate_reports() {
local domain=$1
log_info "Generating comprehensive reports for $domain"
local report_file="$SESSION_DIR/report_${domain}_$(date +%Y%m%d_%H%M%S).txt"
local json_file="$SESSION_DIR/results_${domain}_$(date +%Y%m%d_%H%M%S).json"
# Text report
{
echo "=========================================="
echo "Subdomain Discovery Report"
echo "Domain: $domain"
echo "Timestamp: $(date)"
echo "=========================================="
echo
echo "Summary:"
echo "- Certificate Transparency: $(wc -l < "$RAW_DIR/ct/all_ct.txt") subdomains"
echo "- Passive Sources: $(wc -l < "$RAW_DIR/passive/all_passive.txt") subdomains"
echo "- Active Enumeration: $(wc -l < "$RAW_DIR/active/all_active.txt") subdomains"
echo "- Valid Subdomains: $(wc -l < "$PROCESSED_DIR/valid_subdomains.txt") subdomains"
echo "- Live HTTP Services: $(wc -l < "$PROCESSED_DIR/live_subdomains.txt") subdomains"
echo
echo "Live Subdomains:"
echo "=================="
cat "$PROCESSED_DIR/live_subdomains.txt"
echo
echo "HTTP Status Summary:"
echo "===================="
grep -E "^https?://" "$PROCESSED_DIR/http_results.txt" | \
grep -oE "\[.*\]" | \
sort | uniq -c | sort -nr
} > "$report_file"
# JSON report
{
echo "{"
echo " \"domain\": \"$domain\","
echo " \"timestamp\": \"$(date -Iseconds)\","
echo " \"summary\": {"
echo " \"ct_sources\": $(wc -l < "$RAW_DIR/ct/all_ct.txt"),"
echo " \"passive_sources\": $(wc -l < "$RAW_DIR/passive/all_passive.txt"),"
echo " \"active_enumeration\": $(wc -l < "$RAW_DIR/active/all_active.txt"),"
echo " \"valid_subdomains\": $(wc -l < "$PROCESSED_DIR/valid_subdomains.txt"),"
echo " \"live_services\": $(wc -l < "$PROCESSED_DIR/live_subdomains.txt")"
echo " },"
echo " \"subdomains\": ["
local first=true
while read -r subdomain; do
[ "$first" = true ] && first=false || echo ","
echo -n " \"$subdomain\""
done < "$PROCESSED_DIR/valid_subdomains.txt"
echo
echo " ]"
echo "}"
} > "$json_file"
log_success "Reports generated:"
log_success "- Text report: $report_file"
log_success "- JSON report: $json_file"
}
send_notifications() {
local domain=$1
local results_count=$(wc -l < "$PROCESSED_DIR/valid_subdomains.txt")
# Slack notification
if [ -n "${SLACK_WEBHOOK:-}" ]; then
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"š Subdomain discovery completed for \`$domain\`\nš Found $results_count valid subdomains\nš Results: $SESSION_DIR\"}" \
"$SLACK_WEBHOOK" > /dev/null 2>&1
fi
# Email notification
if [ -n "${EMAIL_RECIPIENT:-}" ]; then
echo "Subdomain discovery completed for $domain. Found $results_count valid subdomains." | \
mail -s "Subdomain Discovery Report - $domain" "$EMAIL_RECIPIENT"
fi
}
# Main execution function
main() {
local domain=$1
log_info "Starting comprehensive subdomain discovery for $domain"
# Setup
check_dependencies
create_directories
setup_session "$domain"
# Data collection phase
collect_ct_data "$domain"
collect_passive_data "$domain"
collect_active_data "$domain"
# Processing phase
generate_permutations "$domain"
validate_subdomains "$domain"
# Reporting phase
generate_reports "$domain"
send_notifications "$domain"
log_success "Subdomain discovery pipeline completed for $domain"
log_info "Results available in: $SESSION_DIR"
}
# Script entry point
if [ $# -eq 0 ]; then
echo "Usage: $0 $lt;domain>"
echo "Example: $0 example.com"
exit 1
fi
main "$1"
CI/CD Integration and Continuous Monitoring
Integrate subdomain discovery into development workflows:
GitHub Actions Workflow
name: Subdomain Discovery and Monitoring
on:
schedule:
- cron: '0 2 * * *' # Daily at 2 AM
push:
branches: [ main ]
paths: [ 'domains.txt' ]
workflow_dispatch:
jobs:
subdomain-discovery:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y dnsutils curl jq parallel
# Install Go tools
go install github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest
go install github.com/OWASP/Amass/v3/...@latest
go install github.com/projectdiscovery/httpx/cmd/httpx@latest
# Install Python dependencies
pip install aiohttp dnspython
- name: Run subdomain discovery
env:
VT_API_KEY: ${{ secrets.VIRUSTOTAL_API_KEY }}
ST_API_KEY: ${{ secrets.SECURITYTRAILS_API_KEY }}
run: |
mkdir -p results
while read domain; do
echo "Processing $domain..."
python3 scripts/subdomain_discovery.py "$domain" --output "results/${domain}_results.json"
done < domains.txt
- name: Compare with previous results
run: |
python3 scripts/compare_results.py
- name: Generate summary report
run: |
python3 scripts/generate_report.py
- name: Upload results
uses: actions/upload-artifact@v3
with:
name: subdomain-results
path: results/
- name: Notify on new discoveries
if: env.NEW_SUBDOMAINS == 'true'
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
run: |
curl -X POST -H 'Content-type: application/json' \
--data @notification_payload.json \
"$SLACK_WEBHOOK"
Docker-Based Scalable Solution
# Dockerfile for subdomain discovery service
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
dnsutils \
curl \
jq \
parallel \
git \
&& rm -rf /var/lib/apt/lists/*
# Install Go and Go-based tools
RUN curl -L https://golang.org/dl/go1.19.linux-amd64.tar.gz | tar -C /usr/local -xz
ENV PATH="/usr/local/go/bin:${PATH}"
RUN go install github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest && \
go install github.com/OWASP/Amass/v3/...@latest && \
go install github.com/projectdiscovery/httpx/cmd/httpx@latest
# Copy application code
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Create non-root user
RUN useradd -m -u 1000 scanner
USER scanner
# Default command
CMD ["python3", "main.py"]
Kubernetes Deployment
apiVersion: batch/v1
kind: CronJob
metadata:
name: subdomain-discovery
spec:
schedule: "0 2 * * *" # Daily at 2 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: subdomain-scanner
image: your-registry/subdomain-discovery:latest
env:
- name: TARGET_DOMAINS
valueFrom:
configMapKeyRef:
name: scanner-config
key: domains
- name: API_KEYS
valueFrom:
secretKeyRef:
name: scanner-secrets
key: api-keys
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2"
volumeMounts:
- name: results-storage
mountPath: /app/results
volumes:
- name: results-storage
persistentVolumeClaim:
claimName: scanner-results-pvc
restartPolicy: OnFailure
Performance Optimization and Scaling
Caching and Rate Limiting
import redis
import time
from functools import wraps
class APICache:
def __init__(self, redis_host='localhost', redis_port=6379, ttl=3600):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.ttl = ttl
def cache_key(self, func_name, *args, **kwargs):
"""Generate cache key from function name and arguments"""
key_parts = [func_name] + [str(arg) for arg in args]
key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])
return ":".join(key_parts)
def cached_api_call(self, ttl=None):
"""Decorator for caching API calls"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = self.cache_key(func.__name__, *args, **kwargs)
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Call function and cache result
result = func(*args, **kwargs)
if result:
cache_ttl = ttl or self.ttl
self.redis_client.setex(cache_key, cache_ttl, json.dumps(result))
return result
return wrapper
return decorator
class RateLimiter:
def __init__(self, max_calls=100, time_window=60):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# Remove old calls outside time window
self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
# Check if we're at the limit
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
time.sleep(sleep_time)
# Record this call
self.calls.append(now)
return func(*args, **kwargs)
return wrapper
Distributed Processing
import asyncio
import aioredis
from dataclasses import dataclass
from typing import List
import json
@dataclass
class DiscoveryTask:
domain: str
sources: List[str]
priority: int = 1
class DistributedScanner:
def __init__(self, redis_url='redis://localhost'):
self.redis_url = redis_url
self.task_queue = 'subdomain_tasks'
self.result_queue = 'subdomain_results'
async def submit_task(self, task: DiscoveryTask):
"""Submit a discovery task to the distributed queue"""
redis = await aioredis.from_url(self.redis_url)
task_data = {
'domain': task.domain,
'sources': task.sources,
'priority': task.priority,
'timestamp': time.time()
}
await redis.lpush(self.task_queue, json.dumps(task_data))
await redis.close()
async def process_tasks(self, worker_id: str):
"""Worker process to handle discovery tasks"""
redis = await aioredis.from_url(self.redis_url)
while True:
try:
# Get task from queue
task_data = await redis.brpop(self.task_queue, timeout=30)
if not task_data:
continue
task = json.loads(task_data[1])
# Process the task
results = await self.discover_subdomains(
task['domain'],
task['sources']
)
# Store results
result_data = {
'worker_id': worker_id,
'domain': task['domain'],
'results': results,
'timestamp': time.time()
}
await redis.lpush(self.result_queue, json.dumps(result_data))
except Exception as e:
logging.error(f"Worker {worker_id} error: {e}")
await asyncio.sleep(5)
await redis.close()
async def get_results(self, domain: str, timeout: int = 300):
"""Collect results for a specific domain"""
redis = await aioredis.from_url(self.redis_url)
start_time = time.time()
results = []
while time.time() - start_time < timeout:
result_data = await redis.brpop(self.result_queue, timeout=10)
if result_data:
result = json.loads(result_data[1])
if result['domain'] == domain:
results.extend(result['results'])
else:
break
await redis.close()
return results
Conclusion
Building effective subdomain discovery automation requires careful consideration of architecture, performance, and scalability. The examples in this guide demonstrate how to create sophisticated systems that can handle enterprise-scale reconnaissance while maintaining flexibility and reliability.
Key takeaways for successful automation include:
- Modular design: Build systems that can easily incorporate new data sources and techniques
- Intelligent rate limiting: Respect API limits and avoid detection while maintaining performance
- Error handling: Gracefully handle failures and continue processing
- Scalability: Design for horizontal scaling and distributed processing
- Integration: Seamlessly integrate with existing security workflows and tools
As organizations continue to expand their digital footprint and adopt cloud-native architectures, automated subdomain discovery becomes increasingly critical. The frameworks and techniques presented here provide a solid foundation for building production-ready systems that can evolve with changing requirements and emerging technologies.