How to intercept headers with handle_stream and continue processing other RequestHandlers?

31 views
Skip to first unread message

Ricardo Abuchaim

unread,
May 26, 2024, 9:27:11 PMMay 26
to Tornado Web Server
Hello everybody,

I have a Tornado application and I need to intercept the headers in real time and take some actions. So far so good, my problem is after intercepting the headers, how do I make the application continue with its normal flow? In other words, in the example below, how do I get the application to leave the handle_stream and reach the APIHandler?

When I run this server and run the curl command "curl http://127.0.0.1:8000/api/example", I get a run_foverer()

If I comment out the handle_stream's "While True" block everything works, but I can't intercept the headers in real time.

#!/usr/bin/env python3
# encoding: utf-8
# -*- coding: utf-8 -*-
import asyncio, tornado, typing

def debug(text):
    print(f'\033[91;1m>>>\033[0m \033[38;2;0;255;0m{text}\033[0m')

class APIHandler(tornado.web.RequestHandler):
    def get(self, addr):
        self.write(f"APIHandler - {self.request.path}\n")

class CustomHTTPServer(tornado.httpserver.HTTPServer):
    @tornado.gen.coroutine
    def handle_stream(self, stream: tornado.iostream.IOStream, address: typing.Tuple[str, int]):
        while True:
            try:
                header_data = yield stream.read_until(b"\r\n")
                if header_data == b'\r\n':
                    break
                debug(header_data.decode().strip())
            except tornado.iostream.StreamClosedError:
                break
        super(CustomHTTPServer,self).handle_stream(stream,address)

def make_app():
    return tornado.web.Application([(r"/api/(.*)", APIHandler)],
                                   debug=True,autoreload=True)

async def main():
    app = make_app()
    http_server = CustomHTTPServer(app, xheaders=True)
    http_server.listen(8000)
    print("*" * 40)
    print("Starting... port 8000")
    print("- " * 20)
    await asyncio.Event().wait()

if __name__ == "__main__":
    tornado.log.enable_pretty_logging()
    asyncio.run(main())


I researched the issues a lot, I studied the HAProxy Protocol example, but nothing helped me. I am unable to intercept in real-time and continue with the application. The prepare functions of the RequestHandler, or headers_received of the HTTPMessageDelegate do not work for me, I need a stream of the headers.

I know I got stuck because I already read the data from the stream, now how do I put that back? And if I do a stream.write(header_data) I get an HTTP/0.9 error in curl (lol)

Thanks for any help!

[..]s Ricardo Abuchaim 
getheaders.py

Ben Darnell

unread,
Jun 4, 2024, 8:42:01 PMJun 4
to python-...@googlegroups.com
The problem is that super().handle_stream expects a stream that is positioned at the beginning of the HTTP data, but you've already read past that point. It's not clear what your ultimate goal is, but I'd recommend trying to do this within the HTTPServerConnectionDelegate/HTTPMessageDelegate interfaces instead of modifying the HTTPServer. By injecting your own HTTPMessageDelegate you can see the HTTP headers and then pass them on to the regular handler (or not). 

If that doesn't meet your needs, you have a couple of other options, but neither is appealing. First, you could create a new IOStream object (on a local socket pair), pass it to super().handle_stream, then copy data back and forth between the outside socket connection (including the header data you set aside) and the internal stream. That's kind of annoying to code up and has some performance overhead but it uses only documented interfaces. The other option is to reach into IOStream's internals to "unread" the headers. This was the path I took in my HTTP/2 implementation at https://github.com/bdarnell/tornado_http2/blob/2cd0a6b45d5cd1c7145ca769202104c2c656a85f/tornado_http2/server.py#L83-L89

-Ben

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/python-tornado/3f244faa-b989-41ec-b19f-2847e9283573n%40googlegroups.com.

Ricardo Abuchaim

unread,
Jun 5, 2024, 4:41:43 PMJun 5
to Tornado Web Server
Thank you very much for your attention Ben!

What I intend to do is intercept the headers in real time to detect a possible Slowloris attack on my application. In this attack, the offender sends a bunch of headers and holds the connection.

It's a very simple api server, the request data is very small, just a "GET /api/1.2.3.4" call but even putting a low max_header_size, max_buffer_size and a server_timeout of
0.2 seconds, the server is affected, and the number of requests per second drops drastically.

By placing an NGINX in front, the data does not reach the application, but the connections are taken anyway, so my intention is to intercept the headers, analyze them, locate the origin (X-Forwarded-For or not) and if necessary, block the IP in a firewall.

The offender is always disconnected due to a timeout and then comes another connection... more than 1000 connections from different IPs are opened. I was successful in this method, I was unable to continue the normal flow of continuing to process the web.RequestHandlers, but now I understand the issue of "stream that is positioned at the beginning of the HTTP data"

I also had success replacing the iostream.IOStream._maybe_add_error_listener() function with my own function, because when the connection is closed because it did not find "\r\n\r\n" this function is called and there I managed to get the entire header of the attack (but not in real time), treat it and identify the source. But if the offender discovers my limits (max_header_size for example) and starts sending '\r\n\r\n' before my max_header_size he can escape my analysis.

I thought it was possible to use handle_stream() to "non-intrusively" read the headers.

To understand better, install the attack tool with "pip install ptdos" and use the command line "ptdos -a slowloris -d 10 -dst 127.0.0.1 -dp 8000 -sq 1000" to attack and you will see the output below:

>>> CLOSED connection 45809 - 34 headers with 512 bytes ('127.0.0.1', 47074) [0.105831532's]
['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:97.0) Gecko/20100101 Firefox/97.0', 'Accept-language: en-US,en', 'Connection: Keep-Alive', 'X-a: fdGz', 'X-a: PiFd', 'X-a: NsD5', 'X-a: ZrmY', 'X-a: 70oB', 'X-a: nihG', 'X-a: tBeU', 'X-a: 1CgJ', 'X-a: S37B', 'X-a: HBVU', 'X-a: SOCv', 'X-a: goBE', 'X-a: haY3', 'X-a: sUmx', 'X-a: PW45', 'X-a: MrVs', 'X-a: JJLO', 'X-a: kM6E', 'X-a: iTVB', 'X-a: W0nl', 'X-a: sdd8', 'X-a: AEKM', 'X-a: q518', 'X-a: dQHV', 'X-a: 6WeI', 'X-a: CulU', 'X-a: 6TTm', 'X-a: G9m8', 'X-a: iian', 'X-a: U4rW', 'X-a: AKWu']
>>> CLOSED connection 46011 - 10 headers with 256 bytes ('127.0.0.1', 48906) [0.074380679's]
['User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:98.0) Gecko/20100101 Firefox/98.0', 'Accept-language: en-US,fr-CA', 'Connection: Keep-Alive', 'X-a: 7QTf', 'X-a: ToCE', 'X-a: TXet', 'X-a: T0Ug', 'X-a: OxvD', 'X-a: j97A', 'X-a: grYb']
>>> CLOSED connection 45810 - 30 headers with 512 bytes ('127.0.0.1', 47078) [0.106003490's]
['User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36', 'Accept-language: en-US,en', 'Connection: Keep-Alive', 'X-a: 6a4y', 'X-a: oQlQ', 'X-a: 7Lzi', 'X-a: gc5K', 'X-a: FzCD', 'X-a: kpSs', 'X-a: TpCW', 'X-a: XdGc', 'X-a: yTvs', 'X-a: OFb5', 'X-a: bsfT', 'X-a: txhV', 'X-a: XT9Z', 'X-a: VbDS', 'X-a: IsjM', 'X-a: ELwE', 'X-a: y4DR', 'X-a: B13s', 'X-a: h1Du', 'X-a: DeLz', 'X-a: rwLA', 'X-a: bPf7', 'X-a: 2yZW', 'X-a: ja53', 'X-a: 9TD5', 'X-a: yeeu', 'X-a: jEVt']
>>> CLOSED connection 45994 - 9 headers with 256 bytes ('127.0.0.1', 48770) [0.075165867's]
['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0', 'Accept-language: da, en-GB;q=0.8, en;q=0.7', 'Connection: Keep-Alive', 'X-a: Mxgr', 'X-a: L7Hi', 'X-a: Lv6n', 'X-a: SEcC', 'X-a: 6Vxn', 'X-a: zHPn']
>>> CLOSED connection 45811 - 30 headers with 512 bytes ('127.0.0.1', 47082) [0.106148996's]
['User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36', 'Accept-language: da, en-GB;q=0.8, en;q=0.7', 'Connection: Keep-Alive', 'X-a: erOW', 'X-a: RMsJ', 'X-a: XGUW', 'X-a: se1n', 'X-a: DURm', 'X-a: 45ai', 'X-a: y8t8', 'X-a: GVP1', 'X-a: 4WYx', 'X-a: I3vW', 'X-a: MYFV', 'X-a: 49Jq', 'X-a: Imij', 'X-a: 3IPZ', 'X-a: 44oe', 'X-a: IVah', 'X-a: poJk', 'X-a: ZYfo', 'X-a: z7So', 'X-a: pBzQ', 'X-a: 775l', 'X-a: Wma0', 'X-a: mYbW', 'X-a: 41MH', 'X-a: jYe3', 'X-a: 5pNE', 'X-a: cIJm']

This is how I used the _maybe_add_error_listener() function:

#!/usr/bin/env python3
# encoding: utf-8
# -*- coding: utf-8 -*-
import sys, asyncio, tornado, typing, datetime, itertools, time

class MyAtomicCounter:
    def __init__(self, start_number: int = 0):
        self._counter = itertools.count(start_number)
        self._counter_access = itertools.count()
    def incr(self):
        return next(self._counter)
    @property
    def value(self):
        return next(self._counter) - next(self._counter_access)

class myClassIOStream(tornado.iostream.IOStream):
    def _maybe_add_error_listener(self) -> None:
        if len(self._read_buffer) > 0:
            # debug(f"CLOSED connection {self.conn_number} - {len(self._read_buffer)} bytes {self.address} [{'%.9f'%(time.monotonic()-self.start_time)}]")
            # self.close()
            a_list = [str(item.decode()) for item in self._read_buffer.split(b"\r\n")][1:-1] # [1:-1] ignore the REQUEST START LINE and the LAST LINE (empty line)
            debug(f"CLOSED connection {self.conn_number} - {len(a_list)} headers with {len(self._read_buffer)} bytes {self.address} [{'%.9f'%(time.monotonic()-self.start_time)}'s]")
            print(a_list)
            self.close()
        if self._state is None or self._state == tornado.ioloop.IOLoop.ERROR:
            if (
                not self.closed()
                and self._read_buffer_size == 0
                and self._close_callback is not None
            ):
                self._add_io_state(tornado.ioloop.IOLoop.READ)

tornado.iostream.IOStream._maybe_add_error_listener = myClassIOStream._maybe_add_error_listener

def debug(text):
    sys.stdout.write(f'\033[91;1m>>>\033[0m \033[38;2;0;255;0m{text}\033[0m'+'\n')
    sys.stdout.flush()

class APIHandler(tornado.web.RequestHandler):
    def get(self, addr):
        # debug("APIHandler")
        self.write(f"APIHandler - {self.request.path}\n")

class CustomHTTPServer(tornado.httpserver.HTTPServer):
    total_conn_counter = MyAtomicCounter()
    @tornado.gen.coroutine
    def handle_stream(self, stream: tornado.iostream.IOStream, address: typing.Tuple[str, int]):
        stream.start_time = time.monotonic()
        stream.conn_number = self.total_conn_counter.incr()
        stream.address = address
        super(CustomHTTPServer,self).handle_stream(stream,address)

def make_app():
    return tornado.web.Application([(r"/api/(.*)", APIHandler)],
                                   debug=False,autoreload=True)

async def main():
    global http_server
    app = make_app()
    http_server = CustomHTTPServer(app, xheaders=True, no_keep_alive=True, max_header_size=256, max_buffer_size=512, )
    http_server.listen(8000,backlog=10000,reuse_port=True)
    print("*" * 80)
    print(f"Listening on port 8000 - {datetime.datetime.now()}")
    print("- " * 40)
    await asyncio.Event().wait()

if __name__ == "__main__":
    # tornado.log.enable_pretty_logging()
    asyncio.run(main())


Can you suggest another "more elegant" solution? lol
getheaders.py

Ben Darnell

unread,
Jun 6, 2024, 9:41:40 AMJun 6
to python-...@googlegroups.com
1. It feels like you're doing this in the wrong place if you need to parse the X-Forwarded-For header - that header is being added by some proxy, and that proxy is in a better position to detect and handle this pattern (For example, it can see if the request is trickling in with one header per packet, instead of receiving headers in as few packets as possible like in nearly all legitimate traffic. Proxies that add headers will necessarily change packet boundaries and will usually batch things back up). In general I'd strongly recommend using a proxy for DoS defense instead of trying to do it in tornado - Python is relatively inefficient and a python process can get overwhelmed by abusive traffic pretty easily compared to something like nginx. 
2. If you do need to do abuse detection in the backend like this before headers come in, it's better to use the HAProxy protocol instead of the X-Forwarded-For header. This doesn't require rewinding the stream and is in theory easier to implement in Tornado.
3. Slowloris attacks work primarily because there are expensive resources (such as threads) being held as long as the connection is open, even while it's just waiting for the next header packet. That's not as much of an issue in Tornado; waiting connections are cheap. Instead I think what you're seeing is the effect of just processing the number of packets involved in python. (If I'm reading your logs correctly each connection is sending about 27 header packets in 100ms - not very slow). Consider other kinds of DoS attacks too; I'm not sure that slowloris should be your biggest concern. 

-Ben

Reply all
Reply to author
Forward
0 new messages