Skip to content
3 changes: 2 additions & 1 deletion debian/rules
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/make -f

export PYBUILD_NAME=httoop
export PYBUILD_SYSTEM=pyproject

%:
dh $@ --with python2,python3 --buildsystem=pybuild
dh $@ --with python3 --buildsystem=pybuild
24 changes: 22 additions & 2 deletions httoop/header/conditional.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,28 @@ def __int__(self) -> int:

class _MatchElement:

def matches(self, value):
return self == value or self == '*'
def __eq__(self, other: Any) -> bool:
return self.value == other or self.value == '*'

def matches(self, etag):
return self == etag

def matches_etag(self, etag, *, strong: bool = True):
value = self.value

if value == '*':
return True

is_weak = value.startswith('W/')
if is_weak:
if strong:
return False
value = value[2:]

if not (value.startswith('"') and value.endswith('"')):
return False

return value[1:-1] == etag


class ETag(HeaderElement):
Expand Down
2 changes: 1 addition & 1 deletion httoop/messages/body.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def __iter__(self) -> Iterator[Any]:

def __compose_chunked_iter(self, iterable):
for data in iterable:
if not data:
if not data: # pragma: no cover
continue
yield b'%x\r\n%s\r\n' % (len(data), data)
if self.trailer:
Expand Down
5 changes: 0 additions & 5 deletions httoop/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,4 @@ def __new__(mcs: type, name: str, bases: Any, dict_: dict[str, Any]) -> Any:
bases.remove(object)
bases.append(Semantic)

# python 2/3 unifying
if '__bool__' in dict_ or '__nonzero__' in dict_:
dict_.setdefault('__bool__', dict_.get('__nonzero__'))
dict_.setdefault('__nonzero__', dict_.get('__bool__'))

return super().__new__(mcs, name, tuple(bases), dict_)
7 changes: 1 addition & 6 deletions httoop/uri/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@ def _unquote_host(self, host: bytes) -> str:
host = host[1:-1]
try:
host = inet_ntop(AF_INET6, inet_pton(AF_INET6, host.decode('ascii')))
if isinstance(host, bytes): # Python 2
host = host.decode('ascii')
return '[%s]' % (host, )
except (SocketError, UnicodeDecodeError):
# IPvFuture
Expand All @@ -275,10 +273,7 @@ def _unquote_host(self, host: bytes) -> str:
# IPv4
if all(x.isdigit() for x in host.split(b'.')):
try:
host = inet_ntop(AF_INET, inet_pton(AF_INET, host.decode('ascii')))
if isinstance(host, bytes): # Python 2
host = host.decode('ascii')
return host
return inet_ntop(AF_INET, inet_pton(AF_INET, host.decode('ascii')))
except (SocketError, UnicodeDecodeError):
raise InvalidURI(_('Invalid IPv4 address in URI.'))

Expand Down
3 changes: 2 additions & 1 deletion httoop/util.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Utilities for python2/3 compatibility."""
"""Utilities for Python compatibility."""

from __future__ import annotations

import codecs
from typing import Any, Callable


try:
from typing import Self
except ImportError: # < Py 3.11
Expand Down
8 changes: 2 additions & 6 deletions httoop/version.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from httoop.header import Server as __Server, UserAgent as __UserAgent
from httoop.messages import Protocol


if __import__('sys').version_info < (3, 5) and __import__('sys').version_info > (2, 8): # pragma: no cover
raise RuntimeError('httoop only supports >= python2.7 and >= python3.5!')

from httoop.header import Server as __Server, UserAgent as __UserAgent # noqa
from httoop.messages import Protocol # noqa

__version__ = '0.1.1'
UserAgentHeader = __UserAgent.parse(b'httoop/%s' % (__version__.encode(), ))
ServerHeader = __Server.parse(b'httoop/%s' % (__version__.encode(), ))
Expand Down
15 changes: 15 additions & 0 deletions tests/api/test_wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ def application9(environ, start_response):
return []


def application_string_write(environ, start_response):
# it's probably invalid but we support it currently
write = start_response(OK, response_headers)
write(output.decode('ASCII'))
return []


def application_string_result(environ, start_response):
# it's probably invalid but we support it currently
start_response(OK, response_headers)
return output.decode('ASCII')


@pytest.mark.parametrize('application,output', [
(application1, output),
(application2, output),
Expand All @@ -81,6 +94,8 @@ def application9(environ, start_response):
(application7, output),
(application8, output + output),
(application9, b''),
(application_string_write, output),
(application_string_result, output),
])
def test_wsgi_success(application, output):
client = WSGIClient()
Expand Down
12 changes: 10 additions & 2 deletions tests/authentication/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@ def test_basic_authorization(headers):
auth = Authorization('Basic', {'username': 'admin', 'password': '12345'})
assert bytes(auth) == b'Basic YWRtaW46MTIzNDU='
headers.parse(b'Authorization: %s' % auth)
assert headers.element('Authorization').params['username'] == b'admin'
assert headers.element('Authorization').params['password'] == b'12345'
elem = headers.element('Authorization')
assert elem.params['username'] == b'admin'
assert elem.params['password'] == b'12345'
assert elem.scheme == 'basic'
assert elem.username == 'admin'
assert elem.password == '12345'
elem.username = 'test'
elem.password = 'test'
assert elem.username == 'test'
assert elem.password == 'test'


@pytest.mark.parametrize('invalid', (b'foo', b'Zm9v', 'föo'.encode('latin1')))
Expand Down
10 changes: 9 additions & 1 deletion tests/authentication/test_digest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ def test_digest_authorization(headers):
qop="auth",
nc="00000001"'''
headers.parse(auth_bytes)
assert auth.params == headers.element('Authorization').params
elem = headers.element('Authorization')
assert elem.params == auth.params
assert elem.scheme == 'digest'
assert elem.username == 'Mufasa'
assert elem.password is None
elem.username = 'test'
elem.password = 'test'
assert elem.username == 'test'
assert elem.password is None


def test_digest_authorization_no_qop(headers):
Expand Down
54 changes: 54 additions & 0 deletions tests/headers/test_conditional.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import pytest


def test_conditional_header(headers):
headers.parse(b'Last-Modified: Mon, 15 Jun 2020 21:18:41 GMT')
headers.parse(b'If-Modified-Since: Mon, 15 Jun 2020 21:18:40 GMT')
Expand All @@ -16,3 +19,54 @@ def test_etag_header(headers):
assert headers.element('ETag') == 'foo'
assert headers.element('ETag') == '*'
assert headers.element('ETag') != 'bar'


def test_if_match_header(headers):
headers.parse(b'If-Match: W/"foo"')
assert headers.element('If-Match') == 'W/"foo"'
assert headers.element('If-Match').matches('W/"foo"')
assert headers.element('If-Match') != '"foo"'
assert headers.element('If-Match') != 'foo'
assert headers.element('If-Match') != '*'
assert headers.element('If-Match').matches_etag('foo', strong=False)
assert not headers.element('If-Match').matches_etag('foo')


def test_if_match_header_star(headers):
headers.parse(b'If-Match: *')
assert headers.element('If-Match') == 'W/"foo"'


@pytest.mark.parametrize('match', [
'"def456"',
'"abc123", "def456", "ghi789"',
'*',
])
def test_if_match_header_raw(headers, match):
etag = 'def456'
headers.parse(f'If-Match: {match}'.encode())
assert any(condition.matches_etag(etag) for condition in headers.elements('If-Match'))


@pytest.mark.parametrize('match', [
'"def456"',
'W/"def456"',
'"abc123", "def456", "ghi789"',
'*',
])
def test_if_none_match_header_raw(headers, match):
etag = 'def456'
headers.parse(f'If-Match: {match}'.encode())
assert any(condition.matches_etag(etag, strong=False) for condition in headers.elements('If-Match'))


@pytest.mark.parametrize('match', [
'def456',
'w/"def456"',
'W/"def456"',
'"abc123", "ghi789"',
])
def test_if_mismatch_header_raw(headers, match):
etag = 'def456'
headers.parse(f'If-Match: {match}'.encode())
assert not any(condition.matches_etag(etag) for condition in headers.elements('If-Match'))
4 changes: 4 additions & 0 deletions tests/messaging/test_request_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from httoop.exceptions import InvalidLine


def test_method_format(request_):
assert f'{request_.method}' == 'GET'


def test_method_maxlength(request_):
with pytest.raises(InvalidLine):
request_.method.parse(b'A' * 21)
Expand Down
13 changes: 13 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import subprocess
import sys
import tempfile
Expand All @@ -21,5 +22,17 @@ def test_cli_compose():
assert b'GET / HTTP/1.1\r\n\r\ntest' == stdout


def test_cli_parse():
with tempfile.NamedTemporaryFile() as fd:
fd.write(b'PUT /foo HTTP/1.1\r\nHost: foo\r\n\r\n')
fd.flush()
stdout = subprocess.check_output([sys.executable, '-m', 'httoop', 'parse', 'request', '--file', fd.name])
assert re.match(br"^<HTTP Response\(200 text/plain; charset=UTF\-8\)>\n<HTTP Headers\(\[\('Server', b'httoop/\d+\.\d+\.\d+'\)\]\)>\n<HTTP Body\(0x[0-9a-f]+\)>\n$", stdout)

p = subprocess.Popen([sys.executable, '-m', 'httoop', 'parse', 'response'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
stdout, stderr = p.communicate(b'HTTP/1.1 400 Evil Request\r\n\r\n')
assert re.match(br"^<HTTP Response\(400 text/plain; charset=UTF\-8\)>\n<HTTP Headers\(\[\('Content\-Length', b'0'\)\]\)>\n<HTTP Body\(0x[0-9a-f]+\)>\nb''\n$", stdout), stdout


def test_invalid_input():
assert b'GET / HTTP/1.1\r\n\r\n' == subprocess.check_output([sys.executable, '-m', 'httoop', 'compose', 'request', '--protocol', '1:0', '-H', 'foo'])
1 change: 1 addition & 0 deletions tests/uri/test_uri_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def test_invalid_uri_characters(char):
with pytest.raises(InvalidURI) as exc:
URI().parse(b'/foo%sbar' % (char,))
assert 'must consist of printable ASCII characters without whitespace.' in str(exc.value)
assert b'must consist of printable ASCII characters without whitespace.' in bytes(exc.value)


@pytest.mark.parametrize('char', [
Expand Down
Loading