Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions hived/log.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from time import gmtime

from traceback import format_exception
import simplejson as json

from hived import trail
Expand Down Expand Up @@ -33,4 +33,15 @@ def format(self, record):
if trail.get_id():
record_dict['_trail_id'] = trail.get_id()

return "{}{}".format(conf.LOG_PREFIX, json.dumps(record_dict, default=json_handler))
if record.exc_info:
etype, exc, tb = record.exc_info
record_dict['error'] = {
'etype': etype.__name__,
'exc': exc,
'locals': {k: repr(v)
for k, v in tb.tb_frame.f_locals.iteritems()
if not k.startswith('__')},
'traceback': format_exception(etype, exc, tb),
}

return '{}{}'.format(conf.LOG_PREFIX, json.dumps(record_dict, default=json_handler))
23 changes: 13 additions & 10 deletions hived/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,19 @@ def close(self):
if self.connection is not None:
self.connection.close()

def _try(self, method, _tries=1, **kwargs):
def _try(self, method, **kwargs):
if self.channel is None:
self._connect()

try:
return getattr(self.channel, method)(**kwargs)
except (AMQPError, IOError) as e:
if _tries < MAX_TRIES:
self._connect()
return self._try(method, _tries + 1, **kwargs)
else:
raise ConnectionError(e)
for attempt in xrange(MAX_TRIES, 0, -1):
try:
return getattr(self.channel, method)(**kwargs)

except (AMQPError, IOError):
if attempt > 1:
time.sleep(.5)
else:
raise

def _subscribe(self):
self.default_queue_name = '%s_%s' % (self.subscription, uuid.uuid4())
Expand All @@ -116,7 +117,8 @@ def subscribe(self, routing_key):
self.subscription = routing_key
self._connect()

def put(self, message_dict=None, routing_key='', exchange=None, body=None, priority=0):
def put(self, message_dict=None, routing_key='', exchange=None, body=None,
priority=0, mandatory=False):
"""
Publishes a message to the queue.
message_dict: the json-serializable object that will be published
Expand All @@ -143,6 +145,7 @@ def put(self, message_dict=None, routing_key='', exchange=None, body=None, prior
return self._try('basic_publish',
msg=message,
exchange=exchange,
mandatory=mandatory,
routing_key=routing_key)

def _parse_message(self, message):
Expand Down
10 changes: 6 additions & 4 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import unittest

from amqp import Message, AMQPError, ConnectionError
from amqp import Message, AMQPError, ConnectionError, Connection
from mock import MagicMock, patch, call, Mock, ANY

from hived.queue import (ExternalQueue, MAX_TRIES, SerializationError,
Expand All @@ -27,12 +27,13 @@ def setUp(self):

self.channel_mock = MagicMock()
self.channel_mock.basic_get.return_value = self.message
self.channel_mock.basic_publish.return_value = None

self.connection = MagicMock()
self.connection.channel.return_value = self.channel_mock

self.connection_cls_patcher = patch('amqp.Connection',
return_value=self.connection)
return_value=self.connection)
self.connection_cls_mock = self.connection_cls_patcher.start()

self.external_queue = ExternalQueue('localhost', 'username', 'pwd',
Expand Down Expand Up @@ -79,15 +80,16 @@ def test__try_tries_up_to_max_tries(self):
self.assertEqual(rv, 'rv')

def test__try_doesnt_try_more_than_max_tries(self):
self.channel_mock.method.side_effect = [AMQPError, AMQPError, AMQPError, 'rv']
self.assertRaises(ConnectionError, self.external_queue._try, 'method')
self.channel_mock.method.side_effect = [AMQPError, AMQPError, AMQPError]
self.assertRaises(AMQPError, self.external_queue._try, 'method')

def test_put_uses_default_exchange_if_not_supplied(self):
amqp_msg = Message('body', delivery_mode=2, content_type='application/json', priority=0)
self.external_queue.put(body='body')
self.assertEqual(self.channel_mock.basic_publish.call_args_list,
[call(msg=amqp_msg,
exchange='default_exchange',
mandatory=False,
routing_key='')])

def test_add_trail_keys(self):
Expand Down