From dbcb9bbb18f8cee3bd661ac754989e6a28c8de62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20Cacilh=CE=B1=CF=82?= Date: Wed, 27 Jan 2016 18:41:58 -0200 Subject: [PATCH 1/7] suggestion of exception logging with no need to duplicate the code --- hived/log.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/hived/log.py b/hived/log.py index f346e48..0617d61 100644 --- a/hived/log.py +++ b/hived/log.py @@ -1,7 +1,7 @@ import logging - from time import gmtime +from traceback import format_exception import simplejson as json from hived import trail @@ -33,4 +33,15 @@ def format(self, record): if trail.get_id(): record_dict['_trail_id'] = trail.get_id() - return "{}{}".format(conf.LOG_PREFIX, json.dumps(record_dict, default=json_handler)) + if record.exc_info: + etype, exc, tb = record.exc_info + record_dict['error'] = { + 'etype': etype.__name__, + 'exc': exc, + 'locals': {k: repr(v) + for k, v in tb.tb_frame.f_locals.iteritems() + if not k.startswith('__')}, + 'traceback': format_exception(etype, exc, tb), + } + + return '{}{}'.format(conf.LOG_PREFIX, json.dumps(record_dict, default=json_handler)) From bd98df12da429525eba552a75853f0511c317b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=84=9Codrigo=20=E2=84=ADacilh=CE=B1=CF=82?= Date: Wed, 17 Feb 2016 11:11:03 -0200 Subject: [PATCH 2/7] do not override traceback on external queue AMQPError/IOError --- hived/queue.py | 4 ++-- tests/test_queue.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hived/queue.py b/hived/queue.py index 7aa5f3c..f876d14 100644 --- a/hived/queue.py +++ b/hived/queue.py @@ -95,12 +95,12 @@ def _try(self, method, _tries=1, **kwargs): try: return getattr(self.channel, method)(**kwargs) - except (AMQPError, IOError) as e: + except (AMQPError, IOError): if _tries < MAX_TRIES: self._connect() return self._try(method, _tries + 1, **kwargs) else: - raise ConnectionError(e) + raise def _subscribe(self): self.default_queue_name = '%s_%s' % (self.subscription, uuid.uuid4()) diff --git a/tests/test_queue.py b/tests/test_queue.py index 385bd34..a1a9742 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -80,7 +80,7 @@ def test__try_tries_up_to_max_tries(self): def test__try_doesnt_try_more_than_max_tries(self): self.channel_mock.method.side_effect = [AMQPError, AMQPError, AMQPError, 'rv'] - self.assertRaises(ConnectionError, self.external_queue._try, 'method') + self.assertRaises(AMQPError, self.external_queue._try, 'method') def test_put_uses_default_exchange_if_not_supplied(self): amqp_msg = Message('body', delivery_mode=2, content_type='application/json', priority=0) From 6ed00c07bae281966f7c9af48bb091452cf22afa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=84=9Codrigo=20=E2=84=ADacilh=CE=B1=CF=82?= Date: Wed, 17 Feb 2016 11:17:01 -0200 Subject: [PATCH 3/7] do not enqueue stack frames for retrying --- hived/queue.py | 19 ++++++++++--------- tests/test_queue.py | 2 +- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/hived/queue.py b/hived/queue.py index f876d14..20b76cc 100644 --- a/hived/queue.py +++ b/hived/queue.py @@ -89,18 +89,19 @@ def close(self): if self.connection is not None: self.connection.close() - def _try(self, method, _tries=1, **kwargs): + def _try(self, method, **kwargs): if self.channel is None: self._connect() - try: - return getattr(self.channel, method)(**kwargs) - except (AMQPError, IOError): - if _tries < MAX_TRIES: - self._connect() - return self._try(method, _tries + 1, **kwargs) - else: - raise + for attempt in xrange(1, MAX_TRIES + 1): + try: + return getattr(self.channel, method)(**kwargs) + + except (AMQPError, IOError): + if attempt < MAX_TRIES: + self._connect() + else: + raise def _subscribe(self): self.default_queue_name = '%s_%s' % (self.subscription, uuid.uuid4()) diff --git a/tests/test_queue.py b/tests/test_queue.py index a1a9742..ca8b41e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -79,7 +79,7 @@ def test__try_tries_up_to_max_tries(self): self.assertEqual(rv, 'rv') def test__try_doesnt_try_more_than_max_tries(self): - self.channel_mock.method.side_effect = [AMQPError, AMQPError, AMQPError, 'rv'] + self.channel_mock.method.side_effect = [AMQPError, AMQPError, AMQPError] self.assertRaises(AMQPError, self.external_queue._try, 'method') def test_put_uses_default_exchange_if_not_supplied(self): From 1e817b07ee6128442df212a3853b943013748a93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=84=9Codrigo=20=E2=84=ADacilh=CE=B1=CF=82?= Date: Wed, 17 Feb 2016 14:19:26 -0200 Subject: [PATCH 4/7] return an unroutable message with a return method XXX: the server should implement the mandatory flag --- hived/queue.py | 1 + tests/test_queue.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hived/queue.py b/hived/queue.py index 20b76cc..a25509c 100644 --- a/hived/queue.py +++ b/hived/queue.py @@ -144,6 +144,7 @@ def put(self, message_dict=None, routing_key='', exchange=None, body=None, prior return self._try('basic_publish', msg=message, exchange=exchange, + mandatory=True, routing_key=routing_key) def _parse_message(self, message): diff --git a/tests/test_queue.py b/tests/test_queue.py index ca8b41e..489f928 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -2,7 +2,7 @@ import json import unittest -from amqp import Message, AMQPError, ConnectionError +from amqp import Message, AMQPError, ConnectionError, Connection from mock import MagicMock, patch, call, Mock, ANY from hived.queue import (ExternalQueue, MAX_TRIES, SerializationError, @@ -27,12 +27,13 @@ def setUp(self): self.channel_mock = MagicMock() self.channel_mock.basic_get.return_value = self.message + self.channel_mock.basic_publish.return_value = None self.connection = MagicMock() self.connection.channel.return_value = self.channel_mock self.connection_cls_patcher = patch('amqp.Connection', - return_value=self.connection) + return_value=self.connection) self.connection_cls_mock = self.connection_cls_patcher.start() self.external_queue = ExternalQueue('localhost', 'username', 'pwd', @@ -88,6 +89,7 @@ def test_put_uses_default_exchange_if_not_supplied(self): self.assertEqual(self.channel_mock.basic_publish.call_args_list, [call(msg=amqp_msg, exchange='default_exchange', + mandatory=True, routing_key='')]) def test_add_trail_keys(self): From 0010e632e7e9e1adaf3b22a09e1fc60ded14e72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=84=9Codrigo=20=E2=84=ADacilh=CE=B1=CF=82?= Date: Wed, 17 Feb 2016 14:26:33 -0200 Subject: [PATCH 5/7] set mandatory default to false --- hived/queue.py | 5 +++-- tests/test_queue.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hived/queue.py b/hived/queue.py index a25509c..913f3d6 100644 --- a/hived/queue.py +++ b/hived/queue.py @@ -117,7 +117,8 @@ def subscribe(self, routing_key): self.subscription = routing_key self._connect() - def put(self, message_dict=None, routing_key='', exchange=None, body=None, priority=0): + def put(self, message_dict=None, routing_key='', exchange=None, body=None, + priority=0, mandatory=False): """ Publishes a message to the queue. message_dict: the json-serializable object that will be published @@ -144,7 +145,7 @@ def put(self, message_dict=None, routing_key='', exchange=None, body=None, prior return self._try('basic_publish', msg=message, exchange=exchange, - mandatory=True, + mandatory=mandatory, routing_key=routing_key) def _parse_message(self, message): diff --git a/tests/test_queue.py b/tests/test_queue.py index 489f928..e380196 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -89,7 +89,7 @@ def test_put_uses_default_exchange_if_not_supplied(self): self.assertEqual(self.channel_mock.basic_publish.call_args_list, [call(msg=amqp_msg, exchange='default_exchange', - mandatory=True, + mandatory=False, routing_key='')]) def test_add_trail_keys(self): From 41623753cf96d7da73acd48fa6d96526ed4b3683 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=84=9Codrigo=20=E2=84=ADacilh=CE=B1=CF=82?= Date: Tue, 26 Apr 2016 14:55:53 -0300 Subject: [PATCH 6/7] BUGFIX: fix infinity loop --- hived/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hived/queue.py b/hived/queue.py index 913f3d6..2d701cd 100644 --- a/hived/queue.py +++ b/hived/queue.py @@ -99,7 +99,7 @@ def _try(self, method, **kwargs): except (AMQPError, IOError): if attempt < MAX_TRIES: - self._connect() + time.sleep(.5) else: raise From 69f911dd020c8a622dcc1cdd20d4eabc0b9fb5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=84=9Codrigo=20=E2=84=ADacilh=CE=B1=CF=82?= Date: Tue, 26 Apr 2016 15:00:28 -0300 Subject: [PATCH 7/7] finetune --- hived/queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hived/queue.py b/hived/queue.py index 2d701cd..c26ee70 100644 --- a/hived/queue.py +++ b/hived/queue.py @@ -93,12 +93,12 @@ def _try(self, method, **kwargs): if self.channel is None: self._connect() - for attempt in xrange(1, MAX_TRIES + 1): + for attempt in xrange(MAX_TRIES, 0, -1): try: return getattr(self.channel, method)(**kwargs) except (AMQPError, IOError): - if attempt < MAX_TRIES: + if attempt > 1: time.sleep(.5) else: raise