Intercepting Twofish-encrypted HTTP traffic with mitmproxy

With mitmproxy, a ncurses-based tool to intercept HTTP(s) connections is available as Open Source Software. This blog post reflects a more advanced usage of mitmproxy and its scripting engine for modifying passing traffic automatically. We have chosen, the communication of a security solution, consisting of a server backend and a connected endpoint agent, as an example. It uses Twofish encryption as an additional layer to protect data transmitted between the agent and the server component. This blog entry describes how to replace a string in the original query with a modified value despite the encryption.

To redirect the traffic, the agent configuration was modified in a way that it uses mitmproxy as a reverse proxy while contacting the server component. For decryption and modification of the passing traffic, we developed a mitmproxy-compatible script that rewrites network traffic automatically.

To allow mitmproxy to work with the script’s code, it makes sense to concentrate its logic in one class. This way, it is easier to handle input arguments, and not all values need to be hard-coded. The general structure of the script looks like the following:

import argparse
from mitmproxy import http, ctx
 
class Replacer:
    def __init__(self, twofish_psk, original_string, replaced_string):
        self.twofish_psk, self.original_string, self.replaced_string = twofish_psk, original_string, replaced_string
 
    def response(self, flow):
        original_query = flow.request.query
        #Do decryption + modification  here
        flow.request.query = modified_query
 
def start():
    parser = argparse.ArgumentParser()
    parser.add_argument("twofish_psk", type=str)
    parser.add_argument("original_string", type=str)
    parser.add_argument("replaced_string", type=str)
    args = parser.parse_args()
    return Replacer(args.twofish_psk, args.original_string, args.replaced_string)

Now start mitmproxy in reverse proxy mode.

#For mitmproxy >=v4:
mitmproxy --mode reverse:http://server
  
#For mitmproxy <v4:
mitmproxy -R http://server 

By using the reverse proxy mode and adding the mitmproxy server in the agent’s configuration file, all traffic from the agent to the server (and its respective replies) can be captured by mitmproxy. In this example, the calls executed from the agent contain an encrypted payload in the query string:

PUT http://server?q=C973C3FEA34B642D1A6E9CF32EFA6E2D 

From the agent’s configuration file, the pre-shared key (PSK) used for the Twofish encryption is known in this case. To decode Twofish, Python offers the “Twofish” package.

As a first step, the Twofish library has to be initialized with the correct key. To allow the key to be used by the Twofish library, it has to be transformed from its hex representation into binary format.

def response(self, flow):
    original_query = flow.request.query
    T = Twofish(codecs.decode(self.twofish_psk, "hex_codec")) # Initialize the twofish lib with the decoded key

Twofish offers various cipher modes. Luckily, the agent-server communication uses ECB cipher mode which is insecure and very easy to implement. For decryption, the input is chunked into blocks of precisely 16-byte length, therefore ECB-encrypted content will always have a multiple of 16 bytes length in Twofish.

def response(self, flow):
    original_query = flow.request.query
    T = Twofish(codecs.decode(self.twofish_psk, "hex_codec")) # Initialize the twofish lib with the decoded key
     
    query_chunks = [original_query[i:i+32] for i in range(0, len(original_query), 32)] # split query in chunks of 16 bytes (32 hex values)
    decoded_chunks = [codecs.decode(chunk, "hex_codec") for chunk in query_chunks] #Converts hex to binary chunks
    decrypted_query = b""
    for chunk in decoded_chunks:
        decrypted_query += T.decrypt(chunk) #Decrypt the chunks using the PSK
    cleartext_query = decrypted_query.decode("utf-8", errors="ignore") #Remove the \xff padding bytes during conversion

Within the decrypted content, the string in the cleartext query can be replaced and the whole process is executed backwards: The cleartext string is converted to binary, chunked, padded with “FF”-bytes and then encrypted.

def response(self, flow):
    original_query = flow.request.query["h"]
    T = Twofish(codecs.decode(self.twofish_psk, "hex_codec"))  # Initialize the twofish lib with the decoded key
     
    query_chunks = [original_query[i:i+32] for i in range(0, len(original_query), 32)]  # Split query in chunks of 16 bytes (32 hex values)
    decoded_chunks = [codecs.decode(chunk, "hex_codec") for chunk in query_chunks]  # Converts hex to binary chunks
    decrypted_query = b""
    for chunk in decoded_chunks:
        decrypted_query += T.decrypt(chunk)  # Decrypt the chunks using the PSK
    cleartext_query = decrypted_query.decode("utf-8", errors="ignore")  # Remove the \xff padding bytes during conversion
 
    cleartext_query.replace(self.original_string, self.replaced_string)
    cleartext_query_bytes = cleartext_query.encode("utf-8")  # Generate binary from the string
    query_chunks = [cleartext_query_bytes[i:i+16] for i in range(0, len(cleartext_query_bytes), 16)]  # Binary chunks
    while len(query_chunks[-1]) < 16: query_chunks[-1] += b"\xFF"  # Insert the padding to achieve 16 byte long blocks
    encrypted_query = b""
    for chunk in query_chunks: encrypted_query += T.encrypt(chunk)  # Encrypt the chunks
    hexed_query = encrypted_query.hex().upper()  # Generate a uppercase hex representation of the bytes
    flow.request.query["h"] = hexed_query  # Replace the original query

The string was replaced, and the query has successfully been modified.

Tips and Tricks

To log a line for debugging purposes, mitmproxy exposes the ctx module. Logged strings are visible in the event console of mitmproxy.

from mitmproxy import ctx
ctx.log.error("This string will be logged")

The command line used to incorporate the replacement is:

mitmproxy --mode reverse:http://server -s "script.py 68756E74657232 query replacement"

Who we are
The “Technology Scouting & Evaluation” (TSE) service identifies and evaluates promising IT security solutions. With this service, DCSO supports companies in staying ahead of a dynamic and ever-changing market. The centralized and unbiased evaluation process is supplemented with the experience of all community members.