Hybrid Unifier
Description
Write Up: Tyron
Créateur: r4sti
Difficulté: very easy
Points: 975
Format du flag: HTB{printable}
Enoncé
-
Dans les profondeurs d'une ancienne bibliothèque, un vieux manuscrit renferme la clé d'un pouvoir invisible. Les érudits qui osaient en percer les secrets devaient d'abord échanger une série de symboles cryptés, formant ainsi un lien que personne ne pouvait rompre. Au fur et à mesure qu'ils établissaient leur connexion, des couches de protection s'enroulaient autour d'eux comme des chaînes invisibles. Mais alors que le dernier chiffre était établi, une prise de conscience glaçante s'est produite : le lien qu'ils avaient forgé était désormais lié à quelque chose de bien plus sombre, quelque chose qui les observait depuis l'ombre.
-
In the depths of an ancient library, an old manuscript held the key to an unseen power. Scholars who dared to unlock its secrets would first exchange a series of encrypted symbols, forming a bond no one could break. As they secured their connection, layers of protection wrapped around them like invisible chains. But as the final cipher was set, a chilling realization struck—the connection they forged was now bound to something far darker, something watching from the shadows.
Pièce(s) jointe(s):
Solution détaillée
Dans ce challenge on a une API à disposition avec plusieurs point d'accès qui permettent entre autres d'initier une session "sécurisée", générer une clé privée et intéragir avec le serveur par messages chiffrés.
Lecture du code source
app.py
Ce bout de code est lancé pour initialiser le serveur, on apprend donc qu'il s'agit d'une application Flask qui va interpréter le blueprint bp du fichier views.py, y a pas grand chose à comprendre à part que le fichier views.py va nous etre utile.
views.py
from app import app
from flask import Blueprint, render_template, request, session, make_response, redirect, url_for, flash, jsonify
from crypto.session import SecureSession
bp = Blueprint('views', __name__)
session = SecureSession(384)
# Step 1. Request the Diffie Hellman parameters
@bp.route('/api/request-session-parameters', methods=['POST'])
def get_session_parameters():
return jsonify({'g': hex(session.g), 'p': hex(session.p)})
# Step 2. Initialize a secure session with the server by sending your Diffie Hellman public key
@bp.route('/api/init-session', methods=['POST'])
def init_session():
if session.initialized:
return jsonify({'status_code': 400, 'error': 'A secure session has already been established.'})
data = request.json
if 'client_public_key' not in data:
return jsonify({'status_code': 400, 'error': 'You need to send the client public key.'})
client_public_key = data['client_public_key']
session.establish_session_key(client_public_key)
session.initialized = True
return jsonify({'status_code': 200, 'success': 'A secure session was successfully established. There will be E2E encryption for the rest of the communication.', 'server_public_key': hex(session.server_public_key)})
# Step 3. Request an encrypted challenge.
@bp.route('/api/request-challenge', methods=['POST'])
def request_challenge():
if not session.initialized:
return jsonify({'status_code': 400, 'error': 'A secure server-client session has to be established first.'})
return jsonify({'encrypted_challenge': session.get_encrypted_challenge().decode()})
# Step 4. Authenticate by responding to the challenge and send an encrypted packet with 'flag' as action to get the flag.
@bp.route('/api/dashboard', methods=['POST'])
def access_secret():
if not session.initialized:
return jsonify({'status_code': 400, 'error': 'A secure server-client session has to be established first.'})
data = request.json
if 'challenge' not in data:
return jsonify({'status_code': 400, 'error': 'You need to send the hash of the challenge.'})
if 'packet_data' not in data:
return jsonify({'status_code': 400, 'error': 'Empty packet.'})
challenge_hash = data['challenge']
if not session.validate_challenge(challenge_hash):
return jsonify({'status_code': 401, 'error': 'Invalid challenge! Something wrong? You can visit /request-challenge to get a new challenge!'})
encrypted_packet = data['packet_data']
packet = session.decrypt_packet(encrypted_packet)
if not 'packet_data' in packet:
return jsonify({'status_code': 400, 'error': packet['error']})
action = packet['packet_data']
if action == 'flag':
return jsonify(session.encrypt_packet(open('/flag.txt').read()))
elif action == 'about':
return jsonify(session.encrypt_packet('[+] Welcome to my custom API! You are currently Alpha testing my new E2E protocol.\nTo get the flag, all you have to do is to follow the protocol as intended. For any bugs, feel free to contact us :-] !'))
else:
return jsonify(session.encrypt_packet('[!] Unknown action.'))
C'est ce code qui va définir les différents point d'accès de l'API.
Un fichier README.pdf nous est fourni pour tout nous expliquer:
Seule les requetes POST sont autorisées.
/api/request-session-parameters
Vous pouvez utiliser ce point d'accès pour obtenir les paramètres Diffie Hellman. Vous en aurez besoin pour l'initialisation de la session sécurisée.
/api/init-session
Vous pouvez utiliser ce point d'accès pour établir une session sécurisée avec le serveur. Vous recevez la clé publique Diffie Hellman du serveur et vous lui envoyez la vôtre.
/api/request-challenge
Vous pouvez utiliser ce point d'accès pour demander un défi chiffré au serveur. Il est chiffré avec
la clé de session partagée. Ce défi est nécessaire pour l'authentification afin d'interagir avec le point de terminaison /api/dashboard.
/api/dashboard
Vous pouvez utiliser ce point d'accès pour envoyer l'action que vous souhaitez au serveur. Les actions disponibles pour le moment sont sont flag et about. Gardez à l'esprit que vous devez envoyer un paquet chiffré.
Les points d'accès /api/request-challenge et /api/dashboard ne sont accessibles qu'après l'établissement d'une session sécurisée. Pour cela, vous devez d'abord interagir avec le point de terminaison /api/init-session.
Pour intéragir avec l'API, on peut utiliser le module requests de python et envoyer des requetes POST:
URL = 'http://localhost:1337'
r = requests.post(f'{URL}/a-cool-endpoint', json={'key1': 'json', 'key2': 'data'})
Pour lire la réponse, on peut utiliser l'attribut content ou text:
print(r.content)
Enfin, le module sur lequel repose les fonctions cryptographique est session.py:
crypto/session.py
from base64 import b64encode as be, b64decode as bd
from Crypto.Util.number import getPrime, long_to_bytes as l2b
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from secrets import randbelow
from hashlib import sha256
import os, json
class SecureSession:
def __init__(self, bits):
self.bits = bits
self.g = 2
self.p = getPrime(self.bits)
self.compute_server_public_key()
self.reset_challenge()
self.initialized = False
def compute_server_public_key(self):
self.a = randbelow(self.p)
self.server_public_key = pow(self.g, self.a, self.p)
def establish_session_key(self, client_public_key):
key = pow(client_public_key, self.a, self.p)
self.session_key = sha256(str(key).encode()).digest()
def reset_challenge(self):
self.challenge = os.urandom(24)
def validate_challenge(self, challenge_hash):
validated = challenge_hash == sha256(self.challenge).hexdigest()
if validated:
self.reset_challenge()
return validated
def encrypt_packet(self, packet):
iv = os.urandom(16)
cipher = AES.new(self.session_key, AES.MODE_CBC, iv)
encrypted_packet = iv + cipher.encrypt(pad(packet.encode(), 16))
return {'packet_data': be(encrypted_packet).decode()}
def decrypt_packet(self, packet):
decoded_packet = bd(packet.encode())
iv = decoded_packet[:16]
encrypted_packet = decoded_packet[16:]
cipher = AES.new(self.session_key, AES.MODE_CBC, iv)
try:
decrypted_packet = unpad(cipher.decrypt(encrypted_packet), 16)
packet_data = decrypted_packet.decode()
except:
return {'error': 'Malformed packet.'}
return {'packet_data': packet_data}
def get_encrypted_challenge(self):
iv = os.urandom(16)
cipher = AES.new(self.session_key, AES.MODE_CBC, iv)
encrypted_challenge = iv + cipher.encrypt(pad(self.challenge, 16))
return be(encrypted_challenge)
Reconnaissance
Au démarrage du serveur, un objet session est initialisé en spécifiant une longueur de 384 bits:
Ainsi, tant que le serveur n'est pas redémarré, les paramètres ne changeront pas.
Dans le fichier views.py il y a des commentaires avec écris step qui nous indique clairement la voie à suivre.
Etape 1
La première étape est de récupérer les paramètres du serveur :
Step 1. Récupérer les paramètres Diffie-Hellman
On peut envoyer une requete POST à l'endpoint /request-session-parameters pour récupérer les paramètres utilisés par serveur avec Burpsuite ou Python.
r = requests.post(f'{url}/api/init-session', json = {'client_public_key':my_pubkey})
# {"g":"0x2","p":"0xf4ac926dddb699f4cb03c005a9f6f89c7505ab5282a3557d2aaf41e822372edd69c2a9d4906a943340bfd87fc08f9743"}
Etape 2
La deuxième est d'envoyer ma clé pblique
Step 2. Initier une session Diffie-Hellman en envoyant une clé publique
@bp.route('/api/init-session', methods=['POST'])
def init_session():
if session.initialized:
return jsonify({'status_code': 400, 'error': 'A secure session has already been established.'})
data = request.json
if 'client_public_key' not in data:
return jsonify({'status_code': 400, 'error': 'You need to send the client public key.'})
client_public_key = data['client_public_key']
session.establish_session_key(client_public_key)
session.initialized = True
return jsonify({'status_code': 200, 'success': 'A secure session was successfully established. There will be E2E encryption for the rest of the communication.', 'server_public_key': hex(session.server_public_key)})
- Si une clé publique a déja été envoyée, une erreur est retournée (pas besoin d'aller regarder
session.initializeden profondeur pour comprendre) - Si je contacte ce point d'accès sans envoyer de clé, une erreur est relevée.
- Si tout se passe bien, la clé publique du serveur m'est renvoyée.
On peut regarder ce qu'il se passe à ce niveau session.establish_session_key(client_public_key).
Car c'est là que va etre définie la clé qui serivra à chiffrer les communications:
crypto/session.py -> establish_session_key()
Donc la clé de session est générée en élevant ma clé publique (notée B par convetion) à la puissance a modulo p : $ B^{a} \mod p $.
D'ailleurs, a est générée dans compute_server_public_key():
crypto/session.py -> compute_server_public_key()
Donc a est aléatoire, et la clé publique du serveur (habituellement notée A) est calculée à partir de a : $ g^{a} \mod p $.
On est en plein dans un échange de clé Diffie-Hellman.
Bob génère sa clé publique B et l'envoie à Alice. Alice génère sa clé publique A et l'envoie à Bob.
Les deux génèrent une clé partagée : $ B^{a} \equiv A^{b} = c \mod p $
Et c'est cette clé qui sera utilisée pour le chiffrement symmétrique.
Donc, je suis censé envoyer ma clé publique. Et générer une clé privée puis avec la clé publique que le serveur m'a donné, calculer la clé partagée.
Il s'agit juste de la mise ne pratique d'un échange de clé diffie-hellamn.
Etape 3
Step 3. Demander un challenge chiffré.
Nécessite que la session soit initiée, puis session.get_encrypted_challenge() est appelé:
crypto/session.py get_encrypted_challenge()
- La variable
challengesera alignée avec du bourrage pour avoir une longueur de octets (la fonction pad()) - puis chiffrée avec AES CBC et la clé de session et un iv aléatoire
- l'IV sera ajouté au début du texte chiffré
- le tout encodé en base64 et envoyé
Donc il suffit de decoder, recupérer l'IV, déchiffré le texte et on a le texte en clair.
Il faut noter que
challengeest généré à l'initalisation de la session :
crypto/session.py -> SecureSession -> init()
et reset_challenge :
En fait c'est juste une chaine de 24 octets aléatoires.
Etape 4
Step 4 : S'authentifier en envoyant l'action flag ET le challenge déchiffré
@bp.route('/api/dashboard', methods=['POST'])
def access_secret():
if not session.initialized:
return jsonify({'status_code': 400, 'error': 'A secure server-client session has to be established first.'})
data = request.json
if 'challenge' not in data:
return jsonify({'status_code': 400, 'error': 'You need to send the hash of the challenge.'})
if 'packet_data' not in data:
return jsonify({'status_code': 400, 'error': 'Empty packet.'})
challenge_hash = data['challenge']
if not session.validate_challenge(challenge_hash):
return jsonify({'status_code': 401, 'error': 'Invalid challenge! Something wrong? You can visit /request-challenge to get a new challenge!'})
encrypted_packet = data['packet_data']
packet = session.decrypt_packet(encrypted_packet)
if not 'packet_data' in packet:
return jsonify({'status_code': 400, 'error': packet['error']})
action = packet['packet_data']
if action == 'flag':
return jsonify(session.encrypt_packet(open('/flag.txt').read()))
elif action == 'about':
return jsonify(session.encrypt_packet('[+] Welcome to my custom API! You are currently Alpha testing my new E2E protocol.\nTo get the flag, all you have to do is to follow the protocol as intended. For any bugs, feel free to contact us :-] !'))
else:
return jsonify(session.encrypt_packet('[!] Unknown action.'))
Pour faire court:
- on doit envoyer le payload à cet endpoint via un json
- envoyer une entrée 'challenge' et une entrée 'packet_data'
- la réussite du challenge sera vérifié à travers la fonction
validate_challenge()(d'ailleurs, si le challenge est réussi, il est changé) packet_dataest censé etre chiffré, si la mauvaise clé est utilisée, on a une erreur, sinon l'action définie danspacket_data(flag ou about) sera exécutée- si l'action est
flag, on récupère le flag
Exploitation
Il n'y a pas vraiment d'exploitation.
On réalise un échange de clé classique, on déchiffre ce que le serveur nous envoie et chiffre nos communications avec le serveur, c'est une sorte de TP.
Mais vu que j'avais pas compris, j'ai cherché une vulnérabilité (alors qu'il y en a pas) et j'ai définie ma clé publique B = $ p - 1 $.
Ce qui fait que lorsque la session_key est générée : le calcul est le suivant :
$ B^{a} \equiv (p-1)^{a} \equiv (p-1+p)^{a} \equiv -1^{a} \mod p $ qui vaut soit 1 soit -1.
Ainsi j'envoie une clé publique qui fait que je peux déviner la clé de session.
Cela est possible grace à l'arithmétique modulaire expliquée sur cryptohack
En résumé:
- on récupère les paramètres
- on initialise la session et génère la clé de session
- on requete le challenge qu'on dechiffre
- on recontacte le serveur avec le challenge chiffré et on recupère le flag
solve_hybrid_unifier.py
from base64 import b64encode as be, b64decode as bd
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Util.number import getPrime, long_to_bytes as l2b
from hashlib import sha256
import requests
import json
import os
def try_decrypt(key, challenge):
challenge = bd(challenge)
iv = challenge[:16]
challenge = challenge[16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(challenge),16)
return sha256(decrypted).hexdigest()
def decrypt_packet(key, packet):
decoded_packet = bd(packet.encode())
iv = decoded_packet[:16]
encrypted_packet = decoded_packet[16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_packet = unpad(cipher.decrypt(encrypted_packet), 16)
packet_data = decrypted_packet.decode()
return packet_data
if __name__ == "__main__":
url = "http://94.237.55.180:50582"
r = requests.post(f'{url}/api/request-session-parameters')
data = json.loads(r.content)
print(f"Parameters : {data}")
g = int(data["g"], 16)
p = int(data["p"], 16)
my_pubkey = p-1
# init session
r = requests.post(f'{url}/api/init-session', json = {'client_public_key':my_pubkey})
data = json.loads(r.content)
print(f"Session initialization : {data}")
server_pubkey = int(data["server_public_key"], 16)
key_1 = sha256(str(p-1).encode()).digest()
key_2 = sha256(str(1).encode()).digest()
#get challenge
r = requests.post(f'{url}/api/request-challenge')
data = json.loads(r.content)
print(f"Challenge : {data['encrypted_challenge']}")
challenge = data['encrypted_challenge']
try :
challenge_hash = try_decrypt(key_1, challenge)
valid_key = key_1
except :
challenge_hash = try_decrypt(key_2, challenge)
valid_key = key_2
print(f"valid key : {valid_key}")
print(f"challenge hash found : {challenge_hash}")
# send challenge response
packet = b'flag'
iv = os.urandom(16)
cipher = AES.new(valid_key, AES.MODE_CBC, iv)
encrypted_packet = cipher.encrypt(pad(packet, 16))
encrypted_packet = be(iv+encrypted_packet)
r = requests.post(f'{url}/api/dashboard', json = {'challenge': challenge_hash, 'packet_data': encrypted_packet})
data = json.loads(r.content)
encrypted_flag = data['packet_data']
flag = decrypt_packet(valid_key, encrypted_flag)
print(f"FLAG : {flag}")
FLAG: HTB{good_job_in_alpha_testing_our...}
Retex
C'est une mise en pratique d'un échange de clé diffie-hellman que je connaissais déjà mais c'est toujours sympa de pouvoir tester ce que ca donne rééellement.
Lien(s) utile(s)
- https://cryptohack.org/courses/modular/
- https://fr.wikipedia.org/wiki/Arithm%C3%A9tique_modulaire
- https://fr.wikipedia.org/wiki/%C3%89change_de_cl%C3%A9s_Diffie-Hellman
