From df7a758e8c60d715a5681877b63ea3a8c1754618 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Wed, 15 Nov 2023 18:01:20 -0500 Subject: [PATCH 01/14] Binance.streams: add more debug logs. --- binance/streams.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/binance/streams.py b/binance/streams.py index 0a8833ea3..bc88a6664 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -77,7 +77,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._conn.__aexit__(exc_type, exc_val, exc_tb) self.ws = None if not self._handle_read_loop: - self._log.error("CANCEL read_loop") + self._log.error(f"{self._path} CANCEL read_loop") await self._kill_read_loop() async def connect(self): @@ -100,6 +100,7 @@ async def connect(self): async def _kill_read_loop(self): self.ws_state = WSListenerState.EXITING while self._handle_read_loop: + self._log.debug("Handle read loop while in _kill_read_loop") await sleep(0.1) async def _before_connect(self): @@ -150,15 +151,15 @@ async def _read_loop(self): }) raise BinanceWebsocketUnableToConnect except asyncio.TimeoutError: - self._log.debug(f"no message in {self.TIMEOUT} seconds") + self._log.debug(f"{self._path} no message in {self.TIMEOUT} seconds") # _no_message_received_reconnect except asyncio.CancelledError as e: - self._log.debug(f"cancelled error {e}") + self._log.debug(f"{self._path} cancelled error {e}") break except asyncio.IncompleteReadError as e: self._log.debug(f"incomplete read error ({e})") except ConnectionClosedError as e: - self._log.debug(f"connection close error ({e})") + self._log.debug(f"{self._path} connection close error ({e})") except gaierror as e: self._log.debug(f"DNS Error ({e})") except BinanceWebsocketUnableToConnect as e: @@ -182,7 +183,7 @@ async def _run_reconnect(self): await asyncio.sleep(reconnect_wait) await self.connect() else: - self._log.error(f'Max reconnections {self.MAX_RECONNECTS} reached:') + self._log.error(f'{self._path} Max reconnections {self.MAX_RECONNECTS} reached:') # Signal the error await self._queue.put({ 'e': 'error', @@ -196,7 +197,7 @@ async def recv(self): try: res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT) except asyncio.TimeoutError: - self._log.debug(f"no message in {self.TIMEOUT} seconds") + self._log.debug(f"{self._path} no message in {self.TIMEOUT} seconds") return res async def _wait_for_reconnect(self): @@ -214,10 +215,11 @@ async def before_reconnect(self): self._reconnects += 1 def _no_message_received_reconnect(self): - self._log.debug('No message received, reconnecting') + self._log.debug(f'{self._path} No message received, reconnecting') self.ws_state = WSListenerState.RECONNECTING async def _reconnect(self): + self._log.debug(f"{self._path} Reconnecting in _reconnect") self.ws_state = WSListenerState.RECONNECTING From 5e7353480fd4df0bb1c9c0e34ff11953ebd5dbd1 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Wed, 15 Nov 2023 18:18:06 -0500 Subject: [PATCH 02/14] Binance.streams: change debug to info logs for easier debugging. --- binance/streams.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/binance/streams.py b/binance/streams.py index bc88a6664..bc9528b66 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -100,7 +100,7 @@ async def connect(self): async def _kill_read_loop(self): self.ws_state = WSListenerState.EXITING while self._handle_read_loop: - self._log.debug("Handle read loop while in _kill_read_loop") + self._log.info("Handle read loop while in _kill_read_loop") await sleep(0.1) async def _before_connect(self): @@ -118,7 +118,7 @@ def _handle_message(self, evt): try: return json.loads(evt) except ValueError: - self._log.debug(f'error parsing evt json:{evt}') + self._log.info(f'error parsing evt json:{evt}') return None async def _read_loop(self): @@ -129,7 +129,7 @@ async def _read_loop(self): await self._run_reconnect() if self.ws_state == WSListenerState.EXITING: - self._log.debug(f"_read_loop {self._path} break for {self.ws_state}") + self._log.info(f"_read_loop {self._path} break for {self.ws_state}") break elif self.ws.state == ws.protocol.State.CLOSING: # type: ignore await asyncio.sleep(0.1) @@ -144,29 +144,29 @@ async def _read_loop(self): if self._queue.qsize() < self.MAX_QUEUE_SIZE: await self._queue.put(res) else: - self._log.debug(f"Queue overflow {self.MAX_QUEUE_SIZE}. Message not filled") + self._log.info(f"Queue overflow {self.MAX_QUEUE_SIZE}. Message not filled") await self._queue.put({ 'e': 'error', 'm': 'Queue overflow. Message not filled' }) raise BinanceWebsocketUnableToConnect except asyncio.TimeoutError: - self._log.debug(f"{self._path} no message in {self.TIMEOUT} seconds") + self._log.info(f"{self._path} no message in {self.TIMEOUT} seconds") # _no_message_received_reconnect except asyncio.CancelledError as e: - self._log.debug(f"{self._path} cancelled error {e}") + self._log.info(f"{self._path} cancelled error {e}") break except asyncio.IncompleteReadError as e: - self._log.debug(f"incomplete read error ({e})") + self._log.info(f"incomplete read error ({e})") except ConnectionClosedError as e: - self._log.debug(f"{self._path} connection close error ({e})") + self._log.info(f"{self._path} connection close error ({e})") except gaierror as e: - self._log.debug(f"DNS Error ({e})") + self._log.info(f"DNS Error ({e})") except BinanceWebsocketUnableToConnect as e: - self._log.debug(f"BinanceWebsocketUnableToConnect ({e})") + self._log.info(f"BinanceWebsocketUnableToConnect ({e})") break except Exception as e: - self._log.debug(f"Unknown exception ({e})") + self._log.info(f"Unknown exception ({e})") continue finally: self._handle_read_loop = None # Signal the coro is stopped @@ -176,7 +176,7 @@ async def _run_reconnect(self): await self.before_reconnect() if self._reconnects < self.MAX_RECONNECTS: reconnect_wait = self._get_reconnect_wait(self._reconnects) - self._log.debug( + self._log.info( f"websocket reconnecting. {self.MAX_RECONNECTS - self._reconnects} reconnects left - " f"waiting {reconnect_wait}" ) @@ -197,7 +197,7 @@ async def recv(self): try: res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT) except asyncio.TimeoutError: - self._log.debug(f"{self._path} no message in {self.TIMEOUT} seconds") + self._log.info(f"{self._path} no message in {self.TIMEOUT} seconds") return res async def _wait_for_reconnect(self): @@ -215,11 +215,11 @@ async def before_reconnect(self): self._reconnects += 1 def _no_message_received_reconnect(self): - self._log.debug(f'{self._path} No message received, reconnecting') + self._log.info(f'{self._path} No message received, reconnecting') self.ws_state = WSListenerState.RECONNECTING async def _reconnect(self): - self._log.debug(f"{self._path} Reconnecting in _reconnect") + self._log.info(f"{self._path} Reconnecting in _reconnect") self.ws_state = WSListenerState.RECONNECTING @@ -274,11 +274,11 @@ async def _keepalive_socket(self): try: listen_key = await self._get_listen_key() if listen_key != self._path: - self._log.debug("listen key changed: reconnect") + self._log.info("listen key changed: reconnect") self._path = listen_key await self._reconnect() else: - self._log.debug("listen key same: keepalive") + self._log.info("listen key same: keepalive") if self._keepalive_type == 'user': await self._client.stream_keepalive(self._path) elif self._keepalive_type == 'margin': # cross-margin From 01c4954e1e16a69480ad0e54ba58d4ee1d424c5d Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Thu, 16 Nov 2023 16:19:19 -0500 Subject: [PATCH 03/14] Binance.streams: more logs. --- binance/streams.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/binance/streams.py b/binance/streams.py index bc9528b66..153b5fa3f 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -88,6 +88,7 @@ async def connect(self): try: self.ws = await self._conn.__aenter__() except: # noqa + self._log.info(f"{self._path} exception caught. Reconnecting.") await self._reconnect() return self.ws_state = WSListenerState.STREAMING @@ -99,6 +100,7 @@ async def connect(self): async def _kill_read_loop(self): self.ws_state = WSListenerState.EXITING + self._log.info(f"_kill_read_loop called for {self._path}") while self._handle_read_loop: self._log.info("Handle read loop while in _kill_read_loop") await sleep(0.1) @@ -132,9 +134,11 @@ async def _read_loop(self): self._log.info(f"_read_loop {self._path} break for {self.ws_state}") break elif self.ws.state == ws.protocol.State.CLOSING: # type: ignore + self._log.info(f"_read_loop {self._path} closing for {self.ws_state}") await asyncio.sleep(0.1) continue elif self.ws.state == ws.protocol.State.CLOSED: # type: ignore + self._log.info(f"_read_loop {self._path} closed for {self.ws_state}") await self._reconnect() elif self.ws_state == WSListenerState.STREAMING: assert self.ws @@ -170,6 +174,7 @@ async def _read_loop(self): continue finally: self._handle_read_loop = None # Signal the coro is stopped + self._log.info(f"Coro {self._path} has stopped.") self._reconnects = 0 async def _run_reconnect(self): From 65515990a2edd7de67dc02e709ecbb8501d272ff Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Fri, 17 Nov 2023 12:18:04 -0500 Subject: [PATCH 04/14] Binance.streams: don't catch all exceptions. This try: except: lock can be problematic, since all exceptions will be caught here, including asyncio.CancelledError from the parent task, which we do not want to catch. --- binance/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binance/streams.py b/binance/streams.py index 153b5fa3f..a67f48a87 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -87,7 +87,7 @@ async def connect(self): self._conn = ws.connect(ws_url, close_timeout=0.1) # type: ignore try: self.ws = await self._conn.__aenter__() - except: # noqa + except Exception: # noqa self._log.info(f"{self._path} exception caught. Reconnecting.") await self._reconnect() return From 64c14fa3b3461a5be293d54950f9de3c972300e3 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Mon, 20 Nov 2023 16:58:23 -0500 Subject: [PATCH 05/14] Binance.streams: more logs. --- binance/streams.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/binance/streams.py b/binance/streams.py index a67f48a87..67a03cc34 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -81,6 +81,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._kill_read_loop() async def connect(self): + self._log.info(f"{self._path} - connect called.") await self._before_connect() assert self._path ws_url = self._url + self._prefix + self._path @@ -96,13 +97,13 @@ async def connect(self): await self._after_connect() # To manage the "cannot call recv while another coroutine is already waiting for the next message" if not self._handle_read_loop: + self._log.info(f"{self._path} call_soon_threadsafe called.") self._handle_read_loop = self._loop.call_soon_threadsafe(asyncio.create_task, self._read_loop()) async def _kill_read_loop(self): self.ws_state = WSListenerState.EXITING - self._log.info(f"_kill_read_loop called for {self._path}") while self._handle_read_loop: - self._log.info("Handle read loop while in _kill_read_loop") + self._log.info(f"{self._path} Handle read loop while in _kill_read_loop") await sleep(0.1) async def _before_connect(self): @@ -124,23 +125,26 @@ def _handle_message(self, evt): return None async def _read_loop(self): + self._log.info(f"_read_loop started for symbol {self._path}") try: while True: try: while self.ws_state == WSListenerState.RECONNECTING: + self._log.info(f"_read_loop {self._path} reconnecting.") await self._run_reconnect() if self.ws_state == WSListenerState.EXITING: self._log.info(f"_read_loop {self._path} break for {self.ws_state}") break elif self.ws.state == ws.protocol.State.CLOSING: # type: ignore - self._log.info(f"_read_loop {self._path} closing for {self.ws_state}") + self._log.info(f"_read_loop {self._path} closing.") await asyncio.sleep(0.1) continue elif self.ws.state == ws.protocol.State.CLOSED: # type: ignore - self._log.info(f"_read_loop {self._path} closed for {self.ws_state}") + self._log.info(f"_read_loop {self._path} closed. Reconnecting.") await self._reconnect() elif self.ws_state == WSListenerState.STREAMING: + self._log.info(f"_read_loop {self._path} streaming.") assert self.ws res = await asyncio.wait_for(self.ws.recv(), timeout=self.TIMEOUT) res = self._handle_message(res) From 4ed900a2bf4be6b9724dbbb63a88c378fb2cc8e7 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Mon, 20 Nov 2023 22:37:24 -0500 Subject: [PATCH 06/14] Binance.streams: remove logs when state is WSListenerState.STREAMING. --- binance/streams.py | 1 - 1 file changed, 1 deletion(-) diff --git a/binance/streams.py b/binance/streams.py index 67a03cc34..d7e3d786b 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -144,7 +144,6 @@ async def _read_loop(self): self._log.info(f"_read_loop {self._path} closed. Reconnecting.") await self._reconnect() elif self.ws_state == WSListenerState.STREAMING: - self._log.info(f"_read_loop {self._path} streaming.") assert self.ws res = await asyncio.wait_for(self.ws.recv(), timeout=self.TIMEOUT) res = self._handle_message(res) From 0cb6f867df761edfdf0c4dda362dd197b1109fb5 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Tue, 21 Nov 2023 10:42:05 -0500 Subject: [PATCH 07/14] Binance.streams: specify if `no message` log comes from recv --- binance/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binance/streams.py b/binance/streams.py index d7e3d786b..d6377769c 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -205,7 +205,7 @@ async def recv(self): try: res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT) except asyncio.TimeoutError: - self._log.info(f"{self._path} no message in {self.TIMEOUT} seconds") + self._log.info(f"{self._path} recv no message in {self.TIMEOUT} seconds") return res async def _wait_for_reconnect(self): From b0c2e4c94cb73f75d2c0a76743a589e34cd8b805 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Thu, 23 Nov 2023 12:03:46 -0500 Subject: [PATCH 08/14] Binance.streams: re-raise on cancellation error. Not raising here and simply catching the error means the parent task will fail to be properly cancelled if the parent receives a cancellation error. --- binance/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binance/streams.py b/binance/streams.py index d6377769c..669fbe9a0 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -162,7 +162,7 @@ async def _read_loop(self): # _no_message_received_reconnect except asyncio.CancelledError as e: self._log.info(f"{self._path} cancelled error {e}") - break + raise e except asyncio.IncompleteReadError as e: self._log.info(f"incomplete read error ({e})") except ConnectionClosedError as e: From e014e0791da797cabab5f6157aa513d1e26009d2 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Thu, 23 Nov 2023 17:34:52 -0500 Subject: [PATCH 09/14] Binance.depthcache: Better warning message on exception. --- binance/depthcache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binance/depthcache.py b/binance/depthcache.py index e59eb79e8..70e96557a 100644 --- a/binance/depthcache.py +++ b/binance/depthcache.py @@ -182,7 +182,7 @@ async def recv(self): try: res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT) except Exception as e: - self._log.warning(e) + self._log.warning(f"An exception occurred: {e}", exc_info=True) else: dc = await self._depth_event(res) return dc From 57af8ee558a666b7e7957524978d99cfdcaf7fc0 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Thu, 23 Nov 2023 17:35:29 -0500 Subject: [PATCH 10/14] Binance.streams: yield control back to event loop in recv. - This allows to check for cancellations. --- binance/streams.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/binance/streams.py b/binance/streams.py index 669fbe9a0..d64dd84a1 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -206,6 +206,10 @@ async def recv(self): res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT) except asyncio.TimeoutError: self._log.info(f"{self._path} recv no message in {self.TIMEOUT} seconds") + + # Yield control to the event loop + await asyncio.sleep(0) + return res async def _wait_for_reconnect(self): From 889f63c72f63405d8617f4b607710dfb64938580 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Fri, 24 Nov 2023 11:10:55 -0500 Subject: [PATCH 11/14] Binance.depthcache: remove spam traceback log. --- binance/depthcache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binance/depthcache.py b/binance/depthcache.py index 70e96557a..e59eb79e8 100644 --- a/binance/depthcache.py +++ b/binance/depthcache.py @@ -182,7 +182,7 @@ async def recv(self): try: res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT) except Exception as e: - self._log.warning(f"An exception occurred: {e}", exc_info=True) + self._log.warning(e) else: dc = await self._depth_event(res) return dc From 618201389a87244b24466ff64b65c6d095a63f0d Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Wed, 6 Dec 2023 08:40:52 -0500 Subject: [PATCH 12/14] Binance.streams: Revert info logs to debug. --- binance/streams.py | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/binance/streams.py b/binance/streams.py index d64dd84a1..092822ac5 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -81,7 +81,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._kill_read_loop() async def connect(self): - self._log.info(f"{self._path} - connect called.") + self._log.debug(f"{self._path} - connect called.") await self._before_connect() assert self._path ws_url = self._url + self._prefix + self._path @@ -89,7 +89,7 @@ async def connect(self): try: self.ws = await self._conn.__aenter__() except Exception: # noqa - self._log.info(f"{self._path} exception caught. Reconnecting.") + self._log.debug(f"{self._path} exception caught. Reconnecting.") await self._reconnect() return self.ws_state = WSListenerState.STREAMING @@ -97,13 +97,13 @@ async def connect(self): await self._after_connect() # To manage the "cannot call recv while another coroutine is already waiting for the next message" if not self._handle_read_loop: - self._log.info(f"{self._path} call_soon_threadsafe called.") + self._log.debug(f"{self._path} call_soon_threadsafe called.") self._handle_read_loop = self._loop.call_soon_threadsafe(asyncio.create_task, self._read_loop()) async def _kill_read_loop(self): self.ws_state = WSListenerState.EXITING while self._handle_read_loop: - self._log.info(f"{self._path} Handle read loop while in _kill_read_loop") + self._log.debug(f"{self._path} Handle read loop while in _kill_read_loop") await sleep(0.1) async def _before_connect(self): @@ -121,27 +121,27 @@ def _handle_message(self, evt): try: return json.loads(evt) except ValueError: - self._log.info(f'error parsing evt json:{evt}') + self._log.debug(f'error parsing evt json:{evt}') return None async def _read_loop(self): - self._log.info(f"_read_loop started for symbol {self._path}") + self._log.debug(f"_read_loop started for symbol {self._path}") try: while True: try: while self.ws_state == WSListenerState.RECONNECTING: - self._log.info(f"_read_loop {self._path} reconnecting.") + self._log.debug(f"_read_loop {self._path} reconnecting.") await self._run_reconnect() if self.ws_state == WSListenerState.EXITING: - self._log.info(f"_read_loop {self._path} break for {self.ws_state}") + self._log.debug(f"_read_loop {self._path} break for {self.ws_state}") break elif self.ws.state == ws.protocol.State.CLOSING: # type: ignore - self._log.info(f"_read_loop {self._path} closing.") + self._log.debug(f"_read_loop {self._path} closing.") await asyncio.sleep(0.1) continue elif self.ws.state == ws.protocol.State.CLOSED: # type: ignore - self._log.info(f"_read_loop {self._path} closed. Reconnecting.") + self._log.debug(f"_read_loop {self._path} closed. Reconnecting.") await self._reconnect() elif self.ws_state == WSListenerState.STREAMING: assert self.ws @@ -151,40 +151,40 @@ async def _read_loop(self): if self._queue.qsize() < self.MAX_QUEUE_SIZE: await self._queue.put(res) else: - self._log.info(f"Queue overflow {self.MAX_QUEUE_SIZE}. Message not filled") + self._log.debug(f"Queue overflow {self.MAX_QUEUE_SIZE}. Message not filled") await self._queue.put({ 'e': 'error', 'm': 'Queue overflow. Message not filled' }) raise BinanceWebsocketUnableToConnect except asyncio.TimeoutError: - self._log.info(f"{self._path} no message in {self.TIMEOUT} seconds") + self._log.debug(f"{self._path} no message in {self.TIMEOUT} seconds") # _no_message_received_reconnect except asyncio.CancelledError as e: - self._log.info(f"{self._path} cancelled error {e}") + self._log.debug(f"{self._path} cancelled error {e}") raise e except asyncio.IncompleteReadError as e: - self._log.info(f"incomplete read error ({e})") + self._log.debug(f"incomplete read error ({e})") except ConnectionClosedError as e: - self._log.info(f"{self._path} connection close error ({e})") + self._log.debug(f"{self._path} connection close error ({e})") except gaierror as e: - self._log.info(f"DNS Error ({e})") + self._log.debug(f"DNS Error ({e})") except BinanceWebsocketUnableToConnect as e: - self._log.info(f"BinanceWebsocketUnableToConnect ({e})") + self._log.debug(f"BinanceWebsocketUnableToConnect ({e})") break except Exception as e: - self._log.info(f"Unknown exception ({e})") + self._log.debug(f"Unknown exception ({e})") continue finally: self._handle_read_loop = None # Signal the coro is stopped - self._log.info(f"Coro {self._path} has stopped.") + self._log.debug(f"Coro {self._path} has stopped.") self._reconnects = 0 async def _run_reconnect(self): await self.before_reconnect() if self._reconnects < self.MAX_RECONNECTS: reconnect_wait = self._get_reconnect_wait(self._reconnects) - self._log.info( + self._log.debug( f"websocket reconnecting. {self.MAX_RECONNECTS - self._reconnects} reconnects left - " f"waiting {reconnect_wait}" ) @@ -205,7 +205,7 @@ async def recv(self): try: res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT) except asyncio.TimeoutError: - self._log.info(f"{self._path} recv no message in {self.TIMEOUT} seconds") + self._log.debug(f"{self._path} recv no message in {self.TIMEOUT} seconds") # Yield control to the event loop await asyncio.sleep(0) @@ -227,11 +227,11 @@ async def before_reconnect(self): self._reconnects += 1 def _no_message_received_reconnect(self): - self._log.info(f'{self._path} No message received, reconnecting') + self._log.debug(f'{self._path} No message received, reconnecting') self.ws_state = WSListenerState.RECONNECTING async def _reconnect(self): - self._log.info(f"{self._path} Reconnecting in _reconnect") + self._log.debug(f"{self._path} Reconnecting in _reconnect") self.ws_state = WSListenerState.RECONNECTING From bfa337cd4d88669c9d45253af9804021bbaee870 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Wed, 6 Dec 2023 08:46:28 -0500 Subject: [PATCH 13/14] Binance.streams: Remove unnecessary debug logs. --- binance/streams.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/binance/streams.py b/binance/streams.py index 092822ac5..fae81b0ea 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -130,18 +130,15 @@ async def _read_loop(self): while True: try: while self.ws_state == WSListenerState.RECONNECTING: - self._log.debug(f"_read_loop {self._path} reconnecting.") await self._run_reconnect() if self.ws_state == WSListenerState.EXITING: self._log.debug(f"_read_loop {self._path} break for {self.ws_state}") break elif self.ws.state == ws.protocol.State.CLOSING: # type: ignore - self._log.debug(f"_read_loop {self._path} closing.") await asyncio.sleep(0.1) continue elif self.ws.state == ws.protocol.State.CLOSED: # type: ignore - self._log.debug(f"_read_loop {self._path} closed. Reconnecting.") await self._reconnect() elif self.ws_state == WSListenerState.STREAMING: assert self.ws @@ -177,7 +174,6 @@ async def _read_loop(self): continue finally: self._handle_read_loop = None # Signal the coro is stopped - self._log.debug(f"Coro {self._path} has stopped.") self._reconnects = 0 async def _run_reconnect(self): @@ -227,11 +223,9 @@ async def before_reconnect(self): self._reconnects += 1 def _no_message_received_reconnect(self): - self._log.debug(f'{self._path} No message received, reconnecting') self.ws_state = WSListenerState.RECONNECTING async def _reconnect(self): - self._log.debug(f"{self._path} Reconnecting in _reconnect") self.ws_state = WSListenerState.RECONNECTING From 6a9c45f1dbb05c520ffddb6080895b54859f4ac2 Mon Sep 17 00:00:00 2001 From: TAMARA LIPOWSKI Date: Wed, 6 Dec 2023 08:48:28 -0500 Subject: [PATCH 14/14] Binance.streams: Remove more spammy logs. --- binance/streams.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/binance/streams.py b/binance/streams.py index fae81b0ea..5fe792dfa 100755 --- a/binance/streams.py +++ b/binance/streams.py @@ -81,7 +81,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._kill_read_loop() async def connect(self): - self._log.debug(f"{self._path} - connect called.") await self._before_connect() assert self._path ws_url = self._url + self._prefix + self._path @@ -97,13 +96,11 @@ async def connect(self): await self._after_connect() # To manage the "cannot call recv while another coroutine is already waiting for the next message" if not self._handle_read_loop: - self._log.debug(f"{self._path} call_soon_threadsafe called.") self._handle_read_loop = self._loop.call_soon_threadsafe(asyncio.create_task, self._read_loop()) async def _kill_read_loop(self): self.ws_state = WSListenerState.EXITING while self._handle_read_loop: - self._log.debug(f"{self._path} Handle read loop while in _kill_read_loop") await sleep(0.1) async def _before_connect(self): @@ -280,11 +277,11 @@ async def _keepalive_socket(self): try: listen_key = await self._get_listen_key() if listen_key != self._path: - self._log.info("listen key changed: reconnect") + self._log.debug("listen key changed: reconnect") self._path = listen_key await self._reconnect() else: - self._log.info("listen key same: keepalive") + self._log.debug("listen key same: keepalive") if self._keepalive_type == 'user': await self._client.stream_keepalive(self._path) elif self._keepalive_type == 'margin': # cross-margin