diff --git a/binance/streams.py b/binance/streams.py index 0a8833ea3..5fe792dfa 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -77,7 +77,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._conn.__aexit__(exc_type, exc_val, exc_tb) self.ws = None if not self._handle_read_loop: - self._log.error("CANCEL read_loop") + self._log.error(f"{self._path} CANCEL read_loop") await self._kill_read_loop() async def connect(self): @@ -87,7 +87,8 @@ async def connect(self): self._conn = ws.connect(ws_url, close_timeout=0.1) # type: ignore try: self.ws = await self._conn.__aenter__() - except: # noqa + except Exception: # noqa + self._log.debug(f"{self._path} exception caught. Reconnecting.") await self._reconnect() return self.ws_state = WSListenerState.STREAMING @@ -121,6 +122,7 @@ def _handle_message(self, evt): return None async def _read_loop(self): + self._log.debug(f"_read_loop started for symbol {self._path}") try: while True: try: @@ -150,15 +152,15 @@ async def _read_loop(self): }) raise BinanceWebsocketUnableToConnect except asyncio.TimeoutError: - self._log.debug(f"no message in {self.TIMEOUT} seconds") + self._log.debug(f"{self._path} no message in {self.TIMEOUT} seconds") # _no_message_received_reconnect except asyncio.CancelledError as e: - self._log.debug(f"cancelled error {e}") - break + self._log.debug(f"{self._path} cancelled error {e}") + raise e except asyncio.IncompleteReadError as e: self._log.debug(f"incomplete read error ({e})") except ConnectionClosedError as e: - self._log.debug(f"connection close error ({e})") + self._log.debug(f"{self._path} connection close error ({e})") except gaierror as e: self._log.debug(f"DNS Error ({e})") except BinanceWebsocketUnableToConnect as e: @@ -182,7 +184,7 @@ async def _run_reconnect(self): await asyncio.sleep(reconnect_wait) await self.connect() else: - self._log.error(f'Max reconnections {self.MAX_RECONNECTS} reached:') + self._log.error(f'{self._path} Max reconnections {self.MAX_RECONNECTS} reached:') # Signal the error await self._queue.put({ 'e': 'error', @@ -196,7 +198,11 @@ async def recv(self): try: res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT) except asyncio.TimeoutError: - self._log.debug(f"no message in {self.TIMEOUT} seconds") + self._log.debug(f"{self._path} recv no message in {self.TIMEOUT} seconds") + + # Yield control to the event loop + await asyncio.sleep(0) + return res async def _wait_for_reconnect(self): @@ -214,7 +220,6 @@ async def before_reconnect(self): self._reconnects += 1 def _no_message_received_reconnect(self): - self._log.debug('No message received, reconnecting') self.ws_state = WSListenerState.RECONNECTING async def _reconnect(self):