Skip to content

Can't run multiple tests with same instance of ReusableClient #63

@slavict

Description

@slavict

How to reproduce

import pytest
from sanic_testing.reusable import ReusableClient
from sanic import Sanic
from sanic import response


@pytest.fixture(scope="session")
def app():
    sanic_app = Sanic("TestSanic")

    @sanic_app.get("/")
    def basic(request):
        return response.text("foo")

    @sanic_app.post("/api/login")
    def basic(request):
        return response.text("foo")

    @sanic_app.get("/api/resources")
    def basic(request):
        return response.text("foo")

    return sanic_app


@pytest.fixture(scope="session")
def cli(app):
    cli = ReusableClient(app)
    return cli


def test_root(cli):
    with cli:
        _, response = cli.get("/")
        assert response.status == 200


def test_login(cli):
    with cli:
        _, response = cli.post("/api/login", )
        assert response.status == 200

        _, response = cli.get("/api/resources")
        assert response.status == 200

Error that I got

self = <_UnixSelectorEventLoop running=False closed=False debug=False>
future = <Task finished name='Task-18' coro=<StartupMixin.create_server() done, defined at /home/ghost/wuw2/lib/python3.10/site-packages/sanic/mixins/startup.py:347> exception=RuntimeError('cannot reuse already awaited coroutine')>

    def run_until_complete(self, future):
        """Run until the Future is done.
    
        If the argument is a coroutine, it is wrapped in a Task.
    
        WARNING: It would be disastrous to call run_until_complete()
        with the same coroutine twice -- it would wrap it in two
        different Tasks and that can't be good.
    
        Return the Future's result, or raise its exception.
        """
        self._check_closed()
        self._check_running()
    
        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False
    
        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
    
>       return future.result()
E       RuntimeError: cannot reuse already awaited coroutine

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions