From 919707c4fd409d3201c6fe247fbce709d45e5444 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 20:26:35 +0100 Subject: [PATCH 01/12] Scope out state machine classes --- hyper/state/__init__.py | 0 hyper/state/h2.py | 84 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 hyper/state/__init__.py create mode 100644 hyper/state/h2.py diff --git a/hyper/state/__init__.py b/hyper/state/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/hyper/state/h2.py b/hyper/state/h2.py new file mode 100644 index 00000000..03e8f49b --- /dev/null +++ b/hyper/state/h2.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +""" +hyper/state/h2 +~~~~~~~~~~~~~~ + +An implementation of a HTTP/2 state machine. + +The purpose of this is to trial the effectiveness of an explicit +state-machine-based approach to HTTP/2 in Python. Ideally, if this succeeds, +it will be pulled out into its own library and act as the core of a general +Python HTTP/2 implementation that can be plugged into, for example, Twisted. +""" +from enum import Enum + + +class StreamState(Enum): + IDLE = 0 + CONTINUATION = 1 + RESERVED_REMOTE = 2 + RESERVED_LOCAL = 3 + RESERVED_REMOTE_CONT = 4 + RESERVED_LOCAL_CONT = 5 + OPEN = 6 + HALF_CLOSED_REMOTE = 7 + HALF_CLOSED_LOCAL = 8 + CLOSED = 9 + + +class H2Stream(object): + """ + A single HTTP/2 stream state machine. + + This stream object implements basically the state machine described in + RFC 7540 section 5.1, with some extensions. The state machine as described + in that RFC does not include state transitions associated with CONTINUATION + frames. To formally handle those frames in the state machine process, we + extend the number of states to include continuation sent/received states. + """ + # For the sake of clarity, we reproduce the RFC 7540 state machine here: + # + # +--------+ + # send PP | | recv PP + # ,--------| idle |--------. + # / | | \ + # v +--------+ v + # +----------+ | +----------+ + # | | | send H / | | + # ,------| reserved | | recv H | reserved |------. + # | | (local) | | | (remote) | | + # | +----------+ v +----------+ | + # | | +--------+ | | + # | | recv ES | | send ES | | + # | send H | ,-------| open |-------. | recv H | + # | | / | | \ | | + # | v v +--------+ v v | + # | +----------+ | +----------+ | + # | | half | | | half | | + # | | closed | | send R / | closed | | + # | | (remote) | | recv R | (local) | | + # | +----------+ | +----------+ | + # | | | | | + # | | send ES / | recv ES / | | + # | | send R / v send R / | | + # | | recv R +--------+ recv R | | + # | send R / `----------->| |<-----------' send R / | + # | recv R | closed | recv R | + # `----------------------->| |<----------------------' + # +--------+ + # + # send: endpoint sends this frame + # recv: endpoint receives this frame + # + # H: HEADERS frame (with implied CONTINUATIONs) + # PP: PUSH_PROMISE frame (with implied CONTINUATIONs) + # ES: END_STREAM flag + # R: RST_STREAM frame + # + # Note that we add two extra states after reserved local/remote and one + # extra state after idle, accounting for the fact that continuation frames + # exist. The transitions from those states occur on the receipt of either + # END_HEADERS (transition to open or half-closed, depending on source) or + # RST_STREAM (transition immediately to closed). This adds substantial + # complexity, but c'est la vie. + pass From 29c1c5820389678e125e441d8376c3b517d8054c Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 20:32:50 +0100 Subject: [PATCH 02/12] Init --- hyper/state/h2.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 03e8f49b..aed5ea6e 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -35,6 +35,9 @@ class H2Stream(object): in that RFC does not include state transitions associated with CONTINUATION frames. To formally handle those frames in the state machine process, we extend the number of states to include continuation sent/received states. + + :param stream_id: The stream ID of this stream. This is stored primarily + for logging purposes. """ # For the sake of clarity, we reproduce the RFC 7540 state machine here: # @@ -81,4 +84,6 @@ class H2Stream(object): # END_HEADERS (transition to open or half-closed, depending on source) or # RST_STREAM (transition immediately to closed). This adds substantial # complexity, but c'est la vie. - pass + def __init__(self, stream_id): + self.state = StreamState.IDLE + self.stream_id = stream_id From cc9ccfdf69d8bf08c96b16ede7ad7a1785f28310 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 21:56:50 +0100 Subject: [PATCH 03/12] Concretely populate the state machine --- hyper/state/h2.py | 120 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 9 deletions(-) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index aed5ea6e..ce015d83 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -13,17 +13,43 @@ from enum import Enum +class ProtocolError(Exception): + """ + An action was attempted in violation of the HTTP/2 protocol. + """ + + class StreamState(Enum): IDLE = 0 - CONTINUATION = 1 - RESERVED_REMOTE = 2 - RESERVED_LOCAL = 3 - RESERVED_REMOTE_CONT = 4 - RESERVED_LOCAL_CONT = 5 - OPEN = 6 - HALF_CLOSED_REMOTE = 7 - HALF_CLOSED_LOCAL = 8 - CLOSED = 9 + CONTINUATION_LOCAL = 1 + CONTINATION_REMOTE = 2 + RESERVED_REMOTE = 3 + RESERVED_LOCAL = 4 + RESERVED_REMOTE_CONT = 5 + RESERVED_LOCAL_CONT = 6 + OPEN = 7 + HALF_CLOSED_REMOTE = 8 + HALF_CLOSED_LOCAL = 9 + CLOSED = 10 + + +class StreamInputs(Enum): + SEND_HEADERS = 0 + SEND_CONTINUATION = 1 + SEND_PUSH_PROMISE = 2 + SEND_END_HEADERS = 3 + SEND_RST_STREAM = 4 + SEND_DATA = 5 + SEND_WINDOW_UPDATE = 6 + SEND_END_STREAM = 7 + RECV_HEADERS = 8 + RECV_CONTINUATION = 9 + RECV_PUSH_PROMISE = 10 + RECV_END_HEADERS = 11 + RECV_RST_STREAM = 12 + RECV_DATA = 13 + RECV_WINDOW_UPDATE = 14 + RECV_END_STREAM = 15 class H2Stream(object): @@ -84,6 +110,82 @@ class H2Stream(object): # END_HEADERS (transition to open or half-closed, depending on source) or # RST_STREAM (transition immediately to closed). This adds substantial # complexity, but c'est la vie. + # + # The _transitions dictionary contains a mapping of tuples of + # (state, input) to tuples of (side_effect_function, end_state). This map + # contains all allowed transitions: anything not in this map is invalid + # and immediately causes a transition to ``closed``. + _transitions = { + # State: idle + (StreamState.IDLE, StreamInputs.SEND_HEADERS): (None, StreamState.CONTINUATION_LOCAL), + (StreamState.IDLE, StreamInputs.RECV_HEADERS): (None, StreamState.CONTINATION_REMOTE), + (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): (None, StreamState.RESERVED_LOCAL_CONT), + (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): (None, StreamState.RESERVED_REMOTE_CONT), + + # State: sent headers + (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_END_HEADERS): (None, StreamState.OPEN), + (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.CONTINUATION_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: received headers + (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_END_HEADERS): (None, StreamState.OPEN), + (StreamState.CONTINUATION_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: reserved local + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): (None, StreamState.RESERVED_LOCAL_CONT), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: reserved remote + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): (None, StreamState.RESERVED_REMOTE_CONT), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: reserved local, sent headers + (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_END_HEADERS): (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: reserved remote, received headers + (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_END_HEADERS): (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.RESERVED_REMOTE_CONT, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: open + (StreamState.OPEN, StreamInputs.SEND_END_STREAM): (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.OPEN, StreamInputs.RECV_END_STREAM): (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: half-closed remote + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): (None, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + + # State: half-closed local + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): (None, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + } + def __init__(self, stream_id): self.state = StreamState.IDLE self.stream_id = stream_id + + def process_input(self, input_): + """ + Process a specific input in the state machine. + """ + if not isinstance(input_, StreamInputs): + raise ValueError("Input must be an instance of StreamInputs") + + try: + func, target_state = self._transitions[(self.state, input_)] + except KeyError: + self.state = StreamState.CLOSED + raise ProtocolError( + "Invalid input %s in state %s", input_, self.state + ) + else: + return func() From 5b971836b545c38416b0a65485e80c90df6fa61d Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 22:05:03 +0100 Subject: [PATCH 04/12] Actually perform state transitions --- hyper/state/h2.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index ce015d83..8c0b86ab 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -188,4 +188,6 @@ def process_input(self, input_): "Invalid input %s in state %s", input_, self.state ) else: - return func() + self.state = target_state + if func is not None: + return func() From 0ebc397f6cf556f3e486e99be509dc5538339c92 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 22:07:49 +0100 Subject: [PATCH 05/12] Feels weird without this pass statement --- hyper/state/h2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 8c0b86ab..64868fdc 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -17,6 +17,7 @@ class ProtocolError(Exception): """ An action was attempted in violation of the HTTP/2 protocol. """ + pass class StreamState(Enum): From 9dbd518d60bd62dc2819263a96a25f1099f493b4 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 22:15:04 +0100 Subject: [PATCH 06/12] WINDOW_UPDATE frame transitions --- hyper/state/h2.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 64868fdc..4bd162e2 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -125,47 +125,65 @@ class H2Stream(object): # State: sent headers (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_END_HEADERS): (None, StreamState.OPEN), + (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.CONTINUATION_LOCAL), + (StreamState.CONTINUATION_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.CONTINUATION_LOCAL), (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.CONTINUATION_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: received headers (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_END_HEADERS): (None, StreamState.OPEN), + (StreamState.CONTINUATION_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.CONTINUATION_REMOTE), + (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.CONTINUATION_REMOTE), (StreamState.CONTINUATION_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: reserved local (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): (None, StreamState.RESERVED_LOCAL_CONT), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL), (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: reserved remote (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): (None, StreamState.RESERVED_REMOTE_CONT), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE), (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: reserved local, sent headers (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_END_HEADERS): (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL_CONT), + (StreamState.RESERVED_LOCAL_CONT, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL_CONT), (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.RESERVED_LOCAL_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: reserved remote, received headers (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_END_HEADERS): (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.RESERVED_REMOTE_CONT, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE_CONT), + (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE_CONT), (StreamState.RESERVED_REMOTE_CONT, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: open (StreamState.OPEN, StreamInputs.SEND_END_STREAM): (None, StreamState.HALF_CLOSED_LOCAL), (StreamState.OPEN, StreamInputs.RECV_END_STREAM): (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.OPEN), (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: half-closed remote (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): (None, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_REMOTE), (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: half-closed local (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): (None, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_LOCAL), (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), } From 68a813059da6dbd85aa780fea9830863f6046989 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 22:17:51 +0100 Subject: [PATCH 07/12] Send/receive continuation frames --- hyper/state/h2.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 4bd162e2..4a0937bc 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -124,6 +124,7 @@ class H2Stream(object): (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): (None, StreamState.RESERVED_REMOTE_CONT), # State: sent headers + (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_CONTINUATION): (None, StreamState.CONTINUATION_LOCAL), (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_END_HEADERS): (None, StreamState.OPEN), (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.CONTINUATION_LOCAL), (StreamState.CONTINUATION_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.CONTINUATION_LOCAL), @@ -131,6 +132,7 @@ class H2Stream(object): (StreamState.CONTINUATION_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: received headers + (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_CONTINUATION): (None, StreamState.CONTINUATION_REMOTE), (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_END_HEADERS): (None, StreamState.OPEN), (StreamState.CONTINUATION_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.CONTINUATION_REMOTE), (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.CONTINUATION_REMOTE), @@ -152,6 +154,7 @@ class H2Stream(object): (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: reserved local, sent headers + (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_CONTINUATION): (None, StreamState.RESERVED_LOCAL_CONT), (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_END_HEADERS): (None, StreamState.HALF_CLOSED_REMOTE), (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL_CONT), (StreamState.RESERVED_LOCAL_CONT, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL_CONT), @@ -159,6 +162,7 @@ class H2Stream(object): (StreamState.RESERVED_LOCAL_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: reserved remote, received headers + (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_CONTINUATION): (None, StreamState.RESERVED_REMOTE_CONT), (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_END_HEADERS): (None, StreamState.HALF_CLOSED_LOCAL), (StreamState.RESERVED_REMOTE_CONT, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE_CONT), (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE_CONT), From d060d5971292232258cf4de8505cb76a53757059 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 10 Jun 2015 22:19:41 +0100 Subject: [PATCH 08/12] Handle DATA frames --- hyper/state/h2.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 4a0937bc..c237d7be 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -170,6 +170,8 @@ class H2Stream(object): (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: open + (StreamState.OPEN, StreamInputs.SEND_DATA): (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_DATA): (None, StreamState.OPEN), (StreamState.OPEN, StreamInputs.SEND_END_STREAM): (None, StreamState.HALF_CLOSED_LOCAL), (StreamState.OPEN, StreamInputs.RECV_END_STREAM): (None, StreamState.HALF_CLOSED_REMOTE), (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.OPEN), @@ -178,6 +180,7 @@ class H2Stream(object): (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: half-closed remote + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): (None, StreamState.HALF_CLOSED_REMOTE), (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): (None, StreamState.CLOSED), (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_REMOTE), (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_REMOTE), @@ -185,6 +188,7 @@ class H2Stream(object): (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: half-closed local + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): (None, StreamState.HALF_CLOSED_LOCAL), (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): (None, StreamState.CLOSED), (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_LOCAL), (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.HALF_CLOSED_LOCAL), From d85b47d9a327956254cba526c564e86abb76d382 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 12 Jun 2015 21:50:58 +0100 Subject: [PATCH 09/12] Remove continuations from state machine --- hyper/state/h2.py | 103 ++++++++++++++-------------------------------- 1 file changed, 30 insertions(+), 73 deletions(-) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index c237d7be..16a77065 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -22,35 +22,29 @@ class ProtocolError(Exception): class StreamState(Enum): IDLE = 0 - CONTINUATION_LOCAL = 1 - CONTINATION_REMOTE = 2 - RESERVED_REMOTE = 3 - RESERVED_LOCAL = 4 - RESERVED_REMOTE_CONT = 5 - RESERVED_LOCAL_CONT = 6 - OPEN = 7 - HALF_CLOSED_REMOTE = 8 - HALF_CLOSED_LOCAL = 9 - CLOSED = 10 + RESERVED_REMOTE = 1 + RESERVED_LOCAL = 2 + OPEN = 3 + HALF_CLOSED_REMOTE = 4 + HALF_CLOSED_LOCAL = 5 + CLOSED = 6 class StreamInputs(Enum): SEND_HEADERS = 0 - SEND_CONTINUATION = 1 - SEND_PUSH_PROMISE = 2 - SEND_END_HEADERS = 3 - SEND_RST_STREAM = 4 - SEND_DATA = 5 - SEND_WINDOW_UPDATE = 6 - SEND_END_STREAM = 7 - RECV_HEADERS = 8 - RECV_CONTINUATION = 9 - RECV_PUSH_PROMISE = 10 - RECV_END_HEADERS = 11 - RECV_RST_STREAM = 12 - RECV_DATA = 13 - RECV_WINDOW_UPDATE = 14 - RECV_END_STREAM = 15 + SEND_PUSH_PROMISE = 1 + SEND_END_HEADERS = 2 + SEND_RST_STREAM = 3 + SEND_DATA = 4 + SEND_WINDOW_UPDATE = 5 + SEND_END_STREAM = 6 + RECV_HEADERS = 7 + RECV_PUSH_PROMISE = 8 + RECV_END_HEADERS = 9 + RECV_RST_STREAM = 10 + RECV_DATA = 11 + RECV_WINDOW_UPDATE = 12 + RECV_END_STREAM = 13 class H2Stream(object): @@ -58,10 +52,7 @@ class H2Stream(object): A single HTTP/2 stream state machine. This stream object implements basically the state machine described in - RFC 7540 section 5.1, with some extensions. The state machine as described - in that RFC does not include state transitions associated with CONTINUATION - frames. To formally handle those frames in the state machine process, we - extend the number of states to include continuation sent/received states. + RFC 7540 section 5.1. :param stream_id: The stream ID of this stream. This is stored primarily for logging purposes. @@ -105,12 +96,10 @@ class H2Stream(object): # ES: END_STREAM flag # R: RST_STREAM frame # - # Note that we add two extra states after reserved local/remote and one - # extra state after idle, accounting for the fact that continuation frames - # exist. The transitions from those states occur on the receipt of either - # END_HEADERS (transition to open or half-closed, depending on source) or - # RST_STREAM (transition immediately to closed). This adds substantial - # complexity, but c'est la vie. + # For the purposes of this state machine we treat HEADERS and their + # associated CONTINUATION frames as a single jumbo frame. The protocol + # allows/requires this by preventing other frames from being interleved in + # between HEADERS/CONTINUATION frames. # # The _transitions dictionary contains a mapping of tuples of # (state, input) to tuples of (side_effect_function, end_state). This map @@ -118,57 +107,25 @@ class H2Stream(object): # and immediately causes a transition to ``closed``. _transitions = { # State: idle - (StreamState.IDLE, StreamInputs.SEND_HEADERS): (None, StreamState.CONTINUATION_LOCAL), - (StreamState.IDLE, StreamInputs.RECV_HEADERS): (None, StreamState.CONTINATION_REMOTE), - (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): (None, StreamState.RESERVED_LOCAL_CONT), - (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): (None, StreamState.RESERVED_REMOTE_CONT), - - # State: sent headers - (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_CONTINUATION): (None, StreamState.CONTINUATION_LOCAL), - (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_END_HEADERS): (None, StreamState.OPEN), - (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.CONTINUATION_LOCAL), - (StreamState.CONTINUATION_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.CONTINUATION_LOCAL), - (StreamState.CONTINUATION_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), - (StreamState.CONTINUATION_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), - - # State: received headers - (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_CONTINUATION): (None, StreamState.CONTINUATION_REMOTE), - (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_END_HEADERS): (None, StreamState.OPEN), - (StreamState.CONTINUATION_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.CONTINUATION_REMOTE), - (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.CONTINUATION_REMOTE), - (StreamState.CONTINUATION_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), - (StreamState.CONTINUATION_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), + (StreamState.IDLE, StreamInputs.SEND_HEADERS): (None, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.RECV_HEADERS): (None, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): (None, StreamState.RESERVED_LOCAL), + (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): (None, StreamState.RESERVED_REMOTE), # State: reserved local - (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): (None, StreamState.RESERVED_LOCAL_CONT), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): (None, StreamState.HALF_CLOSED_REMOTE), (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL), (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL), (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), # State: reserved remote - (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): (None, StreamState.RESERVED_REMOTE_CONT), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): (None, StreamState.HALF_CLOSED_LOCAL), (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE), (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE), (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), - # State: reserved local, sent headers - (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_CONTINUATION): (None, StreamState.RESERVED_LOCAL_CONT), - (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_END_HEADERS): (None, StreamState.HALF_CLOSED_REMOTE), - (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL_CONT), - (StreamState.RESERVED_LOCAL_CONT, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_LOCAL_CONT), - (StreamState.RESERVED_LOCAL_CONT, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), - (StreamState.RESERVED_LOCAL_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), - - # State: reserved remote, received headers - (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_CONTINUATION): (None, StreamState.RESERVED_REMOTE_CONT), - (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_END_HEADERS): (None, StreamState.HALF_CLOSED_LOCAL), - (StreamState.RESERVED_REMOTE_CONT, StreamInputs.SEND_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE_CONT), - (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_WINDOW_UPDATE): (None, StreamState.RESERVED_REMOTE_CONT), - (StreamState.RESERVED_REMOTE_CONT, StreamInputs.SEND_RST_STREAM): (None, StreamState.CLOSED), - (StreamState.RESERVED_REMOTE_CONT, StreamInputs.RECV_RST_STREAM): (None, StreamState.CLOSED), - # State: open (StreamState.OPEN, StreamInputs.SEND_DATA): (None, StreamState.OPEN), (StreamState.OPEN, StreamInputs.RECV_DATA): (None, StreamState.OPEN), From 952079ffd889e092df3ffa58a8ac0fe7bd5d9366 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Sat, 20 Jun 2015 18:53:17 +0100 Subject: [PATCH 10/12] Further state machine work --- hyper/state/h2.py | 160 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 147 insertions(+), 13 deletions(-) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 16a77065..9f987f47 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -12,6 +12,11 @@ """ from enum import Enum +from ..packages.hyperframe.frame import ( + HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame, + RstStreamFrame +) + class ProtocolError(Exception): """ @@ -33,21 +38,19 @@ class StreamState(Enum): class StreamInputs(Enum): SEND_HEADERS = 0 SEND_PUSH_PROMISE = 1 - SEND_END_HEADERS = 2 - SEND_RST_STREAM = 3 - SEND_DATA = 4 - SEND_WINDOW_UPDATE = 5 - SEND_END_STREAM = 6 - RECV_HEADERS = 7 - RECV_PUSH_PROMISE = 8 - RECV_END_HEADERS = 9 - RECV_RST_STREAM = 10 - RECV_DATA = 11 - RECV_WINDOW_UPDATE = 12 - RECV_END_STREAM = 13 + SEND_RST_STREAM = 2 + SEND_DATA = 3 + SEND_WINDOW_UPDATE = 4 + SEND_END_STREAM = 5 + RECV_HEADERS = 6 + RECV_PUSH_PROMISE = 7 + RECV_RST_STREAM = 8 + RECV_DATA = 9 + RECV_WINDOW_UPDATE = 10 + RECV_END_STREAM = 11 -class H2Stream(object): +class H2StreamStateMachine(object): """ A single HTTP/2 stream state machine. @@ -175,3 +178,134 @@ def process_input(self, input_): self.state = target_state if func is not None: return func() + + +class H2Stream(object): + """ + A low-level HTTP/2 stream object. This handles building and receiving + frames and maintains per-stream state. + + This wraps a HTTP/2 Stream state machine implementation, ensuring that + frames can only be sent/received when the stream is in a valid state. + Attempts to create frames that cannot be sent will raise a + ``ProtocolError``. + """ + def __init__(self, stream_id): + self.state_machine = H2StreamStateMachine(stream_id) + self.stream_id = stream_id + self.max_outbound_frame_size = None + self.max_inbound_frame_size = None + + def send_headers(self, headers, encoder, end_stream=False): + """ + Returns a list of HEADERS/CONTINUATION frames to emit as either headers + or trailers. + """ + # Because encoding headers makes an irreversible change to the header + # compression context, we make the state transition *first*. + self.state_machine.process_input(StreamInputs.SEND_HEADERS) + encoded_headers = encoder.encode(headers) + + # Slice into blocks of max_outbound_frame_size. Be careful with this: + # it only works right because we never send padded frames or priority + # information on the frames. Revisit this if we do. + header_blocks = ( + encoded_headers[i:i+self.max_outbound_frame_size] + for i in range( + 0, len(encoded_headers), self.max_outbound_frame_size + ) + ) + + frames = [] + hf = HeadersFrame(self.stream_id) + hf.data = header_blocks[0] + frames.append(hf) + + for block in header_blocks[1:]: + cf = ContinuationFrame(self.stream_id) + cf.data = block + frames.append(cf) + + frames[-1].flags.add('END_HEADERS') + + if end_stream: + # Not a bug: the END_STREAM flag is valid on the initial HEADERS + # frame, not the CONTINUATION frames that follow. + self.state_machine.process_input(StreamInputs.END_STREAM) + frames[0].flags.add('END_STREAM') + + return frames + + def send_data(self, data, end_stream=False): + """ + Prepare a data frame. Optionally end the stream. + """ + # TODO: Automatically split into multiple frames. + # TODO: Something something flow control. + self.state_machine.process_input(StreamInputs.SEND_DATA) + df = DataFrame(self.stream_id) + df.data = data + + if end_stream: + df.flags.add('END_STREAM') + + return df + + def end_stream(self): + """ + End a stream without sending data. + """ + self.state_machine.process_input(StreamInputs.END_STREAM) + df = DataFrame(self.stream_id) + df.flags.add('END_STREAM') + return df + + def increase_flow_control_window(self, increment): + """ + Increase the size of the flow control window for the remote side. + """ + self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE) + wuf = WindowUpdateFrame(self.stream_id) + wuf.window_increment = increment + return wuf + + def receive_headers(self, end_stream): + """ + Receive a set of headers (or trailers). + """ + self.state_machine.process_input(StreamInputs.RECV_HEADERS) + + if end_stream: + self.state_machine.process_input(StreamInputs.END_STREAM) + + def receive_data(self, end_stream): + """ + Receive some data. + """ + self.state_machine.process_input(StreamInputs.RECV_DATA) + + if end_stream: + self.state_machine.process_input(StreamInputs.END_STREAM) + + def receive_window_update(self, increment): + """ + Handle a WINDOW_UPDATE increment. + """ + self.state_machine.process_input(StreamInputs.RECV_WINDOW_UPDATE) + # TODO: Actually increment flow control! + + def close(self, error_code=0): + """ + Close the stream locally. Reset the stream with an error code. + """ + self.state_machine.process_input(StreamInputs.SEND_RST_STREAM) + + rsf = RstStreamFrame(self.stream_id) + rsf.error_code = error_code + return rsf + + def stream_reset(self): + """ + Handle a stream being reset remotely. + """ + self.state_machine.process_input(StreamInputs.RECV_RST_STREAM From edad060fa21717ad6f1126024e90bf6ea2928fed Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 24 Jun 2015 14:36:19 +0100 Subject: [PATCH 11/12] Fixup basic state errors --- hyper/state/h2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 9f987f47..146c1414 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -231,7 +231,7 @@ def send_headers(self, headers, encoder, end_stream=False): if end_stream: # Not a bug: the END_STREAM flag is valid on the initial HEADERS # frame, not the CONTINUATION frames that follow. - self.state_machine.process_input(StreamInputs.END_STREAM) + self.state_machine.process_input(StreamInputs.SEND_END_STREAM) frames[0].flags.add('END_STREAM') return frames @@ -308,4 +308,4 @@ def stream_reset(self): """ Handle a stream being reset remotely. """ - self.state_machine.process_input(StreamInputs.RECV_RST_STREAM + self.state_machine.process_input(StreamInputs.RECV_RST_STREAM) From e0b4efc067b12a26852847d4bc8057a4ca4b3dbd Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 24 Jun 2015 14:36:26 +0100 Subject: [PATCH 12/12] Cannot index a generator --- hyper/state/h2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hyper/state/h2.py b/hyper/state/h2.py index 146c1414..188da801 100644 --- a/hyper/state/h2.py +++ b/hyper/state/h2.py @@ -209,12 +209,12 @@ def send_headers(self, headers, encoder, end_stream=False): # Slice into blocks of max_outbound_frame_size. Be careful with this: # it only works right because we never send padded frames or priority # information on the frames. Revisit this if we do. - header_blocks = ( + header_blocks = [ encoded_headers[i:i+self.max_outbound_frame_size] for i in range( 0, len(encoded_headers), self.max_outbound_frame_size ) - ) + ] frames = [] hf = HeadersFrame(self.stream_id)