Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def read(relpath):
author_email='escattone@gmail.com',
url='https://github.com/escattone/txpool',
packages=['txpool'],
install_requires=['twisted>=12'],
install_requires=['twisted>=12', 'six>=1.10'],
classifiers=[
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 2 :: Only',
'Programming Language :: Python :: 3.5',
'Development Status :: 4 - Beta',
'Natural Language :: English',
'Framework :: Twisted',
Expand Down
69 changes: 36 additions & 33 deletions test/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import absolute_import
import os
import re
import math
Expand All @@ -35,6 +36,8 @@
import txpool.pool
from txpool import Pool, cpu_count
from txpool import PoolError, PoolTimeout
from six.moves import map
from six.moves import range


@pytest.inlineCallbacks
Expand All @@ -47,7 +50,7 @@ def test_pool_1():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size
Expand All @@ -69,19 +72,19 @@ def test_pool_1():

try:
result = yield pool.apply_async('math.sqrt', (-1,), timeout=5)
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, PoolError)

try:
result = yield pool.apply_async(os.path.isdir, directory, timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is True

try:
result = yield pool.close(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0
Expand All @@ -104,7 +107,7 @@ def test_pool_2(tmpdir):

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size
Expand All @@ -115,7 +118,7 @@ def test_pool_2(tmpdir):

try:
result = yield pool.close(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0
Expand Down Expand Up @@ -153,14 +156,14 @@ def test_pool_3():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size

try:
result = yield pool.terminate(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0
Expand All @@ -172,7 +175,7 @@ def test_pool_3():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size
Expand All @@ -181,7 +184,7 @@ def test_pool_3():

try:
result = yield pool.on_closure(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0
Expand All @@ -197,25 +200,25 @@ def test_pool_4():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size

deferreds = []
for _ in xrange(100):
for _ in range(100):
deferreds.append(pool.apply_async('os.listdir', ('....',)))

for d in deferreds:
try:
result = yield d
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, PoolError)

try:
result = yield pool.close(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0
Expand All @@ -231,14 +234,14 @@ def test_pool_5():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size

deferreds = []
cancelleds = set()
args = range(1000) + range(-200, 0)
args = list(range(300)) + list(range(-200, 0))
random.shuffle(args)

for i, arg in enumerate(args):
Expand All @@ -251,7 +254,7 @@ def test_pool_5():
for arg, d in deferreds:
try:
result = yield d
except Exception, e:
except Exception as e:
result = e

if arg in cancelleds:
Expand All @@ -263,15 +266,15 @@ def test_pool_5():

try:
result = yield pool.apply_async('time.sleep', (3,), timeout=1)
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, PoolTimeout)

pool.close()

try:
result = yield pool.on_closure(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0
Expand All @@ -287,7 +290,7 @@ def test_pool_6():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size
Expand All @@ -298,13 +301,13 @@ def test_pool_6():

try:
result = yield d
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, CancelledError)

try:
result = yield pool.close(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0
Expand All @@ -320,7 +323,7 @@ def test_pool_7():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size
Expand All @@ -334,42 +337,42 @@ def test_pool_7():
# Try "apply_async" after closing.
try:
result = yield pool.apply_async(int, ('1011',), dict(base=2))
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, PoolError)

# Try "on_ready" after closing.
try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, PoolError)

try:
result = yield pool.on_closure(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == 0

# Call again after pool has closed.
try:
result = yield pool.on_closure()
except Exception, e:
except Exception as e:
result = e
assert result is pool


def test_pool_8():
try:
pool = Pool(size=0, name='test8')
except Exception, e:
except Exception as e:
pool = e
assert isinstance(pool, ValueError)

try:
pool = Pool(size=-10, name='test8')
except Exception, e:
except Exception as e:
pool = e
assert isinstance(pool, ValueError)

Expand All @@ -384,15 +387,15 @@ def test_pool_9():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size

# This will timeout as we haven't called "close".
try:
result = yield pool.on_closure(timeout=2)
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, PoolTimeout)

Expand All @@ -402,7 +405,7 @@ def test_pool_9():
# again.
try:
result = yield pool.on_ready(timeout=2)
except Exception, e:
except Exception as e:
result = e
assert isinstance(result, PoolTimeout)
assert pool.get_number_of_workers() == 0
Expand All @@ -416,7 +419,7 @@ def test_pool_10():

try:
result = yield pool.on_ready(timeout=5)
except Exception, e:
except Exception as e:
result = e
assert result is pool
assert pool.get_number_of_workers() == pool.size
11 changes: 3 additions & 8 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py27, pypy, flake8
envlist = py27, py35, pypy, flake8

[testenv]
commands = py.test --cov={envsitepackagesdir}/txpool \
Expand All @@ -12,13 +12,8 @@ deps =
pytest-cov
pytest-twisted

[testenv:py27]
basepython = python2.7

[testenv:pypy]
basepython = pypy

[testenv:flake8]
deps = flake8
basepython = python2.7
skip_install = true
basepython = python3
commands = flake8 txpool
1 change: 1 addition & 0 deletions txpool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# flake8: noqa
from __future__ import absolute_import
from txpool.pool import cpu_count, Pool, PoolError, PoolTimeout


Expand Down
16 changes: 9 additions & 7 deletions txpool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

__all__ = ['Pool', 'PoolError', 'PoolTimeout', 'cpu_count']

from __future__ import absolute_import
import six
from six.moves import range

import os
import sys
import cPickle as pickle
import six.moves.cPickle as pickle
from collections import deque
from logging import INFO, ERROR
from multiprocessing import cpu_count
from twisted.protocols.basic import NetstringReceiver
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.defer import succeed, fail, Deferred

__all__ = ['Pool', 'PoolError', 'PoolTimeout', 'cpu_count']

DEFAULT_POOL_SIZE = 2

Expand Down Expand Up @@ -205,7 +207,7 @@ def get_log(self):
return '\n'.join(self._log_lines or ())

def __repr__(self):
if isinstance(self.call, basestring):
if isinstance(self.call, six.string_types):
name = self.call
else:
name = repr(self.call)
Expand All @@ -215,7 +217,7 @@ def args():
for arg in self.args:
yield repr(arg)
if self.kwargs:
for item in self.kwargs.iteritems():
for item in six.iteritems(self.kwargs):
yield '%s=%r' % item

return ('<%s object at %#x: %s(%s)>' %
Expand Down Expand Up @@ -258,7 +260,7 @@ def __init__(self, pool, size=None, init_call=None, init_args=None,

self.size = size

for _ in xrange(size):
for _ in range(size):
self.start_worker()

def is_ready(self):
Expand Down Expand Up @@ -538,7 +540,7 @@ def __init__(self, call):
self.remains = ''

def add(self, text):
lines = (self.remains + text).splitlines()
lines = (self.remains + text.decode('latin-1')).splitlines()

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

text will be in bytes and is coming from the worker process' stderr (which is a different i/o channel than the one sending pickled results), and is most likely encoded as "utf-8" I believe (unless the worker process explicitly encodes its stderr as something else), so my thought is that text.decode() (using the default encoding of "utf-8") is better here.

self.remains = lines.pop(-1)

for line in lines:
Expand Down
Loading