diff --git a/hived/log.py b/hived/log.py index f346e48..0617d61 100644 --- a/hived/log.py +++ b/hived/log.py @@ -1,7 +1,7 @@ import logging - from time import gmtime +from traceback import format_exception import simplejson as json from hived import trail @@ -33,4 +33,15 @@ def format(self, record): if trail.get_id(): record_dict['_trail_id'] = trail.get_id() - return "{}{}".format(conf.LOG_PREFIX, json.dumps(record_dict, default=json_handler)) + if record.exc_info: + etype, exc, tb = record.exc_info + record_dict['error'] = { + 'etype': etype.__name__, + 'exc': exc, + 'locals': {k: repr(v) + for k, v in tb.tb_frame.f_locals.iteritems() + if not k.startswith('__')}, + 'traceback': format_exception(etype, exc, tb), + } + + return '{}{}'.format(conf.LOG_PREFIX, json.dumps(record_dict, default=json_handler)) diff --git a/hived/queue.py b/hived/queue.py index 7aa5f3c..c26ee70 100644 --- a/hived/queue.py +++ b/hived/queue.py @@ -89,18 +89,19 @@ def close(self): if self.connection is not None: self.connection.close() - def _try(self, method, _tries=1, **kwargs): + def _try(self, method, **kwargs): if self.channel is None: self._connect() - try: - return getattr(self.channel, method)(**kwargs) - except (AMQPError, IOError) as e: - if _tries < MAX_TRIES: - self._connect() - return self._try(method, _tries + 1, **kwargs) - else: - raise ConnectionError(e) + for attempt in xrange(MAX_TRIES, 0, -1): + try: + return getattr(self.channel, method)(**kwargs) + + except (AMQPError, IOError): + if attempt > 1: + time.sleep(.5) + else: + raise def _subscribe(self): self.default_queue_name = '%s_%s' % (self.subscription, uuid.uuid4()) @@ -116,7 +117,8 @@ def subscribe(self, routing_key): self.subscription = routing_key self._connect() - def put(self, message_dict=None, routing_key='', exchange=None, body=None, priority=0): + def put(self, message_dict=None, routing_key='', exchange=None, body=None, + priority=0, mandatory=False): """ Publishes a message to the queue. message_dict: the json-serializable object that will be published @@ -143,6 +145,7 @@ def put(self, message_dict=None, routing_key='', exchange=None, body=None, prior return self._try('basic_publish', msg=message, exchange=exchange, + mandatory=mandatory, routing_key=routing_key) def _parse_message(self, message): diff --git a/tests/test_queue.py b/tests/test_queue.py index 385bd34..e380196 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -2,7 +2,7 @@ import json import unittest -from amqp import Message, AMQPError, ConnectionError +from amqp import Message, AMQPError, ConnectionError, Connection from mock import MagicMock, patch, call, Mock, ANY from hived.queue import (ExternalQueue, MAX_TRIES, SerializationError, @@ -27,12 +27,13 @@ def setUp(self): self.channel_mock = MagicMock() self.channel_mock.basic_get.return_value = self.message + self.channel_mock.basic_publish.return_value = None self.connection = MagicMock() self.connection.channel.return_value = self.channel_mock self.connection_cls_patcher = patch('amqp.Connection', - return_value=self.connection) + return_value=self.connection) self.connection_cls_mock = self.connection_cls_patcher.start() self.external_queue = ExternalQueue('localhost', 'username', 'pwd', @@ -79,8 +80,8 @@ def test__try_tries_up_to_max_tries(self): self.assertEqual(rv, 'rv') def test__try_doesnt_try_more_than_max_tries(self): - self.channel_mock.method.side_effect = [AMQPError, AMQPError, AMQPError, 'rv'] - self.assertRaises(ConnectionError, self.external_queue._try, 'method') + self.channel_mock.method.side_effect = [AMQPError, AMQPError, AMQPError] + self.assertRaises(AMQPError, self.external_queue._try, 'method') def test_put_uses_default_exchange_if_not_supplied(self): amqp_msg = Message('body', delivery_mode=2, content_type='application/json', priority=0) @@ -88,6 +89,7 @@ def test_put_uses_default_exchange_if_not_supplied(self): self.assertEqual(self.channel_mock.basic_publish.call_args_list, [call(msg=amqp_msg, exchange='default_exchange', + mandatory=False, routing_key='')]) def test_add_trail_keys(self):