Skip to content

Hybrid Unifier


Description

Write Up: Tyron
Créateur: r4sti
Difficulté: very easy
Points: 975
Format du flag: HTB{printable}

Enoncé

  • Dans les profondeurs d'une ancienne bibliothèque, un vieux manuscrit renferme la clé d'un pouvoir invisible. Les érudits qui osaient en percer les secrets devaient d'abord échanger une série de symboles cryptés, formant ainsi un lien que personne ne pouvait rompre. Au fur et à mesure qu'ils établissaient leur connexion, des couches de protection s'enroulaient autour d'eux comme des chaînes invisibles. Mais alors que le dernier chiffre était établi, une prise de conscience glaçante s'est produite : le lien qu'ils avaient forgé était désormais lié à quelque chose de bien plus sombre, quelque chose qui les observait depuis l'ombre.

  • In the depths of an ancient library, an old manuscript held the key to an unseen power. Scholars who dared to unlock its secrets would first exchange a series of encrypted symbols, forming a bond no one could break. As they secured their connection, layers of protection wrapped around them like invisible chains. But as the final cipher was set, a chilling realization struck—the connection they forged was now bound to something far darker, something watching from the shadows.


Pièce(s) jointe(s):


Solution détaillée

Dans ce challenge on a une API à disposition avec plusieurs point d'accès qui permettent entre autres d'initier une session "sécurisée", générer une clé privée et intéragir avec le serveur par messages chiffrés.

Lecture du code source

app.py
from flask import Flask
import os

app = Flask(__name__)

if __name__ == "__main__":
    from views import bp
    app.register_blueprint(bp)
    app.run(host="0.0.0.0", port=1337, debug=False)

Ce bout de code est lancé pour initialiser le serveur, on apprend donc qu'il s'agit d'une application Flask qui va interpréter le blueprint bp du fichier views.py, y a pas grand chose à comprendre à part que le fichier views.py va nous etre utile.

views.py
from app import app
from flask import Blueprint, render_template, request, session, make_response, redirect, url_for, flash, jsonify
from crypto.session import SecureSession

bp = Blueprint('views', __name__)
session = SecureSession(384)

# Step 1. Request the Diffie Hellman parameters
@bp.route('/api/request-session-parameters', methods=['POST'])
def get_session_parameters():
    return jsonify({'g': hex(session.g), 'p': hex(session.p)})

# Step 2. Initialize a secure session with the server by sending your Diffie Hellman public key
@bp.route('/api/init-session', methods=['POST'])
def init_session():
    if session.initialized:
        return jsonify({'status_code': 400, 'error': 'A secure session has already been established.'})

    data = request.json
    if 'client_public_key' not in data:
        return jsonify({'status_code': 400, 'error': 'You need to send the client public key.'})

    client_public_key = data['client_public_key']
    session.establish_session_key(client_public_key)
    session.initialized = True
    return jsonify({'status_code': 200, 'success': 'A secure session was successfully established. There will be E2E encryption for the rest of the communication.', 'server_public_key': hex(session.server_public_key)})

# Step 3. Request an encrypted challenge.
@bp.route('/api/request-challenge', methods=['POST'])
def request_challenge():
    if not session.initialized:
        return jsonify({'status_code': 400, 'error': 'A secure server-client session has to be established first.'})

    return jsonify({'encrypted_challenge': session.get_encrypted_challenge().decode()})

# Step 4. Authenticate by responding to the challenge and send an encrypted packet with 'flag' as action to get the flag. 
@bp.route('/api/dashboard', methods=['POST'])
def access_secret():
    if not session.initialized:
        return jsonify({'status_code': 400, 'error': 'A secure server-client session has to be established first.'})

    data = request.json

    if 'challenge' not in data:
        return jsonify({'status_code': 400, 'error': 'You need to send the hash of the challenge.'})

    if 'packet_data' not in data:
        return jsonify({'status_code': 400, 'error': 'Empty packet.'})

    challenge_hash = data['challenge']
    if not session.validate_challenge(challenge_hash):
            return jsonify({'status_code': 401, 'error': 'Invalid challenge! Something wrong? You can visit /request-challenge to get a new challenge!'})

    encrypted_packet = data['packet_data']

    packet = session.decrypt_packet(encrypted_packet)
    if not 'packet_data' in packet:
        return jsonify({'status_code': 400, 'error': packet['error']})

    action = packet['packet_data']
    if action == 'flag':
        return jsonify(session.encrypt_packet(open('/flag.txt').read()))
    elif action == 'about':
        return jsonify(session.encrypt_packet('[+] Welcome to my custom API! You are currently Alpha testing my new E2E protocol.\nTo get the flag, all you have to do is to follow the protocol as intended. For any bugs, feel free to contact us :-] !'))
    else:
        return jsonify(session.encrypt_packet('[!] Unknown action.'))

C'est ce code qui va définir les différents point d'accès de l'API. Un fichier README.pdf nous est fourni pour tout nous expliquer:

Seule les requetes POST sont autorisées.

  • /api/request-session-parameters

Vous pouvez utiliser ce point d'accès pour obtenir les paramètres Diffie Hellman. Vous en aurez besoin pour l'initialisation de la session sécurisée.

  • /api/init-session

Vous pouvez utiliser ce point d'accès pour établir une session sécurisée avec le serveur. Vous recevez la clé publique Diffie Hellman du serveur et vous lui envoyez la vôtre.

  • /api/request-challenge

Vous pouvez utiliser ce point d'accès pour demander un défi chiffré au serveur. Il est chiffré avec la clé de session partagée. Ce défi est nécessaire pour l'authentification afin d'interagir avec le point de terminaison /api/dashboard.

  • /api/dashboard

Vous pouvez utiliser ce point d'accès pour envoyer l'action que vous souhaitez au serveur. Les actions disponibles pour le moment sont sont flag et about. Gardez à l'esprit que vous devez envoyer un paquet chiffré.

Les points d'accès /api/request-challenge et /api/dashboard ne sont accessibles qu'après l'établissement d'une session sécurisée. Pour cela, vous devez d'abord interagir avec le point de terminaison /api/init-session.

Pour intéragir avec l'API, on peut utiliser le module requests de python et envoyer des requetes POST:

URL = 'http://localhost:1337'
r = requests.post(f'{URL}/a-cool-endpoint', json={'key1': 'json', 'key2': 'data'})

Pour lire la réponse, on peut utiliser l'attribut content ou text:

print(r.content)

Enfin, le module sur lequel repose les fonctions cryptographique est session.py:

crypto/session.py
from base64 import b64encode as be, b64decode as bd
from Crypto.Util.number import getPrime, long_to_bytes as l2b
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from secrets import randbelow
from hashlib import sha256
import os, json

class SecureSession:
    def __init__(self, bits):
        self.bits = bits
        self.g = 2
        self.p = getPrime(self.bits)
        self.compute_server_public_key()
        self.reset_challenge()
        self.initialized = False

    def compute_server_public_key(self):
        self.a = randbelow(self.p)
        self.server_public_key = pow(self.g, self.a, self.p)

    def establish_session_key(self, client_public_key):
        key = pow(client_public_key, self.a, self.p)
        self.session_key = sha256(str(key).encode()).digest()

    def reset_challenge(self):
        self.challenge = os.urandom(24)

    def validate_challenge(self, challenge_hash):
        validated = challenge_hash == sha256(self.challenge).hexdigest()
        if validated:
            self.reset_challenge()
        return validated

    def encrypt_packet(self, packet):
        iv = os.urandom(16)
        cipher = AES.new(self.session_key, AES.MODE_CBC, iv)
        encrypted_packet = iv + cipher.encrypt(pad(packet.encode(), 16))
        return {'packet_data': be(encrypted_packet).decode()}

    def decrypt_packet(self, packet):
        decoded_packet = bd(packet.encode())
        iv = decoded_packet[:16]
        encrypted_packet = decoded_packet[16:]
        cipher = AES.new(self.session_key, AES.MODE_CBC, iv)
        try:
            decrypted_packet = unpad(cipher.decrypt(encrypted_packet), 16)
            packet_data = decrypted_packet.decode()
        except:
            return {'error': 'Malformed packet.'}

        return {'packet_data': packet_data}

    def get_encrypted_challenge(self):
        iv = os.urandom(16)
        cipher = AES.new(self.session_key, AES.MODE_CBC, iv)
        encrypted_challenge = iv + cipher.encrypt(pad(self.challenge, 16))
        return be(encrypted_challenge)

Reconnaissance

Au démarrage du serveur, un objet session est initialisé en spécifiant une longueur de 384 bits:

session.py (ligne 6)

session = SecureSession(384)

Ainsi, tant que le serveur n'est pas redémarré, les paramètres ne changeront pas.

Dans le fichier views.py il y a des commentaires avec écris step qui nous indique clairement la voie à suivre.

Etape 1

La première étape est de récupérer les paramètres du serveur :

Step 1. Récupérer les paramètres Diffie-Hellman

@bp.route('/api/request-session-parameters', methods=['POST'])
def get_session_parameters():
    return jsonify({'g': hex(session.g), 'p': hex(session.p)})

On peut envoyer une requete POST à l'endpoint /request-session-parameters pour récupérer les paramètres utilisés par serveur avec Burpsuite ou Python.

r = requests.post(f'{url}/api/init-session', json = {'client_public_key':my_pubkey})

# {"g":"0x2","p":"0xf4ac926dddb699f4cb03c005a9f6f89c7505ab5282a3557d2aaf41e822372edd69c2a9d4906a943340bfd87fc08f9743"}

Etape 2

La deuxième est d'envoyer ma clé pblique

Step 2. Initier une session Diffie-Hellman en envoyant une clé publique

@bp.route('/api/init-session', methods=['POST'])
def init_session():
    if session.initialized:
        return jsonify({'status_code': 400, 'error': 'A secure session has already been established.'})

    data = request.json
    if 'client_public_key' not in data:
        return jsonify({'status_code': 400, 'error': 'You need to send the client public key.'})

    client_public_key = data['client_public_key']
    session.establish_session_key(client_public_key)
    session.initialized = True
        return jsonify({'status_code': 200, 'success': 'A secure session was successfully established. There will be E2E encryption for the rest of the communication.', 'server_public_key': hex(session.server_public_key)})
  • Si une clé publique a déja été envoyée, une erreur est retournée (pas besoin d'aller regarder session.initialized en profondeur pour comprendre)
  • Si je contacte ce point d'accès sans envoyer de clé, une erreur est relevée.
  • Si tout se passe bien, la clé publique du serveur m'est renvoyée.

On peut regarder ce qu'il se passe à ce niveau session.establish_session_key(client_public_key). Car c'est là que va etre définie la clé qui serivra à chiffrer les communications:

crypto/session.py -> establish_session_key()
def establish_session_key(self, client_public_key):
    key = pow(client_public_key, self.a, self.p)
    self.session_key = sha256(str(key).encode()).digest()

Donc la clé de session est générée en élevant ma clé publique (notée B par convetion) à la puissance a modulo p : $ B^{a} \mod p $.

D'ailleurs, a est générée dans compute_server_public_key():

crypto/session.py -> compute_server_public_key()
def compute_server_public_key(self):
    self.a = randbelow(self.p)
    self.server_public_key = pow(self.g, self.a, self.p)

Donc a est aléatoire, et la clé publique du serveur (habituellement notée A) est calculée à partir de a : $ g^{a} \mod p $.

On est en plein dans un échange de clé Diffie-Hellman.

échange de clé diffie-hellman

Bob génère sa clé publique B et l'envoie à Alice. Alice génère sa clé publique A et l'envoie à Bob.

Les deux génèrent une clé partagée : $ B^{a} \equiv A^{b} = c \mod p $

Et c'est cette clé qui sera utilisée pour le chiffrement symmétrique.

Donc, je suis censé envoyer ma clé publique. Et générer une clé privée puis avec la clé publique que le serveur m'a donné, calculer la clé partagée.

Il s'agit juste de la mise ne pratique d'un échange de clé diffie-hellamn.

Etape 3

Step 3. Demander un challenge chiffré.
@bp.route('/api/request-challenge', methods=['POST'])
def request_challenge():
    if not session.initialized:
        return jsonify({'status_code': 400, 'error': 'A secure server-client session has to be established first.'})

    return jsonify({'encrypted_challenge': session.get_encrypted_challenge().decode()})

Nécessite que la session soit initiée, puis session.get_encrypted_challenge() est appelé:

crypto/session.py get_encrypted_challenge()
def get_encrypted_challenge(self):
    iv = os.urandom(16)
    cipher = AES.new(self.session_key, AES.MODE_CBC, iv)
    encrypted_challenge = iv + cipher.encrypt(pad(self.challenge, 16))
    return be(encrypted_challenge)
  • La variable challenge sera alignée avec du bourrage pour avoir une longueur de octets (la fonction pad())
  • puis chiffrée avec AES CBC et la clé de session et un iv aléatoire
  • l'IV sera ajouté au début du texte chiffré
  • le tout encodé en base64 et envoyé

Donc il suffit de decoder, recupérer l'IV, déchiffré le texte et on a le texte en clair.

Il faut noter que challenge est généré à l'initalisation de la session :

crypto/session.py -> SecureSession -> init()
class SecureSession:
def __init__(self, bits):
    self.bits = bits
    self.g = 2
    self.p = getPrime(self.bits)
    self.compute_server_public_key()
    self.reset_challenge()
    self.initialized = False

et reset_challenge :

crypto/session.py ->
def reset_challenge(self):
    self.challenge = os.urandom(24)

En fait c'est juste une chaine de 24 octets aléatoires.

Etape 4

Step 4 : S'authentifier en envoyant l'action flag ET le challenge déchiffré
@bp.route('/api/dashboard', methods=['POST'])
def access_secret():
    if not session.initialized:
        return jsonify({'status_code': 400, 'error': 'A secure server-client session has to be established first.'})

    data = request.json

    if 'challenge' not in data:
        return jsonify({'status_code': 400, 'error': 'You need to send the hash of the challenge.'})

    if 'packet_data' not in data:
        return jsonify({'status_code': 400, 'error': 'Empty packet.'})

    challenge_hash = data['challenge']
    if not session.validate_challenge(challenge_hash):
            return jsonify({'status_code': 401, 'error': 'Invalid challenge! Something wrong? You can visit /request-challenge to get a new challenge!'})

    encrypted_packet = data['packet_data']

    packet = session.decrypt_packet(encrypted_packet)
    if not 'packet_data' in packet:
        return jsonify({'status_code': 400, 'error': packet['error']})

    action = packet['packet_data']
    if action == 'flag':
        return jsonify(session.encrypt_packet(open('/flag.txt').read()))
    elif action == 'about':
        return jsonify(session.encrypt_packet('[+] Welcome to my custom API! You are currently Alpha testing my new E2E protocol.\nTo get the flag, all you have to do is to follow the protocol as intended. For any bugs, feel free to contact us :-] !'))
    else:
        return jsonify(session.encrypt_packet('[!] Unknown action.'))

Pour faire court:

  • on doit envoyer le payload à cet endpoint via un json
  • envoyer une entrée 'challenge' et une entrée 'packet_data'
  • la réussite du challenge sera vérifié à travers la fonction validate_challenge() (d'ailleurs, si le challenge est réussi, il est changé)
  • packet_data est censé etre chiffré, si la mauvaise clé est utilisée, on a une erreur, sinon l'action définie dans packet_data (flag ou about) sera exécutée
  • si l'action est flag, on récupère le flag

Exploitation

Il n'y a pas vraiment d'exploitation.

On réalise un échange de clé classique, on déchiffre ce que le serveur nous envoie et chiffre nos communications avec le serveur, c'est une sorte de TP.

Mais vu que j'avais pas compris, j'ai cherché une vulnérabilité (alors qu'il y en a pas) et j'ai définie ma clé publique B = $ p - 1 $.

Ce qui fait que lorsque la session_key est générée : le calcul est le suivant :

$ B^{a} \equiv (p-1)^{a} \equiv (p-1+p)^{a} \equiv -1^{a} \mod p $ qui vaut soit 1 soit -1.

Ainsi j'envoie une clé publique qui fait que je peux déviner la clé de session.

Cela est possible grace à l'arithmétique modulaire expliquée sur cryptohack

En résumé:

  • on récupère les paramètres
  • on initialise la session et génère la clé de session
  • on requete le challenge qu'on dechiffre
  • on recontacte le serveur avec le challenge chiffré et on recupère le flag
solve_hybrid_unifier.py
from base64 import b64encode as be, b64decode as bd
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Util.number import getPrime, long_to_bytes as l2b
from hashlib import sha256

import requests
import json
import os

def try_decrypt(key, challenge):

    challenge = bd(challenge)
    iv = challenge[:16]
    challenge = challenge[16:]
    cipher  = AES.new(key, AES.MODE_CBC, iv)
    decrypted = unpad(cipher.decrypt(challenge),16)

    return sha256(decrypted).hexdigest()

def decrypt_packet(key, packet):
    decoded_packet = bd(packet.encode())
    iv = decoded_packet[:16]
    encrypted_packet = decoded_packet[16:]
    cipher = AES.new(key, AES.MODE_CBC, iv)
    decrypted_packet = unpad(cipher.decrypt(encrypted_packet), 16)
    packet_data = decrypted_packet.decode()

    return packet_data


if __name__ == "__main__":
    url = "http://94.237.55.180:50582"
    r = requests.post(f'{url}/api/request-session-parameters')
    data = json.loads(r.content)
    print(f"Parameters : {data}")

    g = int(data["g"], 16)
    p = int(data["p"], 16)

    my_pubkey = p-1

    # init session

    r = requests.post(f'{url}/api/init-session', json = {'client_public_key':my_pubkey})
    data = json.loads(r.content)
    print(f"Session initialization : {data}")

    server_pubkey = int(data["server_public_key"], 16)

    key_1 = sha256(str(p-1).encode()).digest()
    key_2 = sha256(str(1).encode()).digest()

    #get challenge

    r = requests.post(f'{url}/api/request-challenge')
    data = json.loads(r.content)
    print(f"Challenge : {data['encrypted_challenge']}")
    challenge = data['encrypted_challenge'] 
    try :
        challenge_hash = try_decrypt(key_1, challenge)
        valid_key = key_1
    except : 
        challenge_hash = try_decrypt(key_2, challenge)
        valid_key = key_2

    print(f"valid key : {valid_key}")
    print(f"challenge hash found : {challenge_hash}") 

    # send challenge response
    packet = b'flag'
    iv = os.urandom(16)
    cipher  = AES.new(valid_key, AES.MODE_CBC, iv)
    encrypted_packet = cipher.encrypt(pad(packet, 16))
    encrypted_packet = be(iv+encrypted_packet)

    r = requests.post(f'{url}/api/dashboard', json = {'challenge': challenge_hash, 'packet_data': encrypted_packet})

    data = json.loads(r.content)
    encrypted_flag = data['packet_data']

    flag = decrypt_packet(valid_key, encrypted_flag)
    print(f"FLAG : {flag}")

FLAG: HTB{good_job_in_alpha_testing_our...}


Retex

C'est une mise en pratique d'un échange de clé diffie-hellman que je connaissais déjà mais c'est toujours sympa de pouvoir tester ce que ca donne rééellement.


Lien(s) utile(s)