From 2862df877052a133873ab57e4124d4f7334d57dd Mon Sep 17 00:00:00 2001 From: Flavio Amieiro Date: Thu, 16 Jun 2016 01:11:53 -0300 Subject: [PATCH 1/3] WIP: First draft of a corpus worker This is the first draft of a worker that can get a corpus and create an analysis for it. This first attempt was a freqdist worker, that takes the freqdist for each document in the corpus and condensates it in a new analysis: the freqtdist for the entire corpus. This is a work in progress because I was mainly worried with the basis for this to work (specially the celery task). I did not pay any attention to the way the worker itself is working (it's probably doing more work than it needs to), and it also probably needs more tests. --- pypln/backend/celery_task.py | 32 ++++++++++++++++ pypln/backend/config.py | 2 + pypln/backend/workers/__init__.py | 3 +- pypln/backend/workers/corpus_freqdist.py | 32 ++++++++++++++++ tests/test_worker_corpus_freqdist.py | 48 ++++++++++++++++++++++++ tests/utils.py | 1 + 6 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 pypln/backend/workers/corpus_freqdist.py create mode 100644 tests/test_worker_corpus_freqdist.py diff --git a/pypln/backend/celery_task.py b/pypln/backend/celery_task.py index 2d3d93d..7eef8e7 100644 --- a/pypln/backend/celery_task.py +++ b/pypln/backend/celery_task.py @@ -34,6 +34,7 @@ mongo_client = pymongo.MongoClient(host=config.MONGODB_URIS) database = mongo_client[config.MONGODB_DBNAME] document_collection = database[config.MONGODB_COLLECTION] +corpora_collection = database[config.MONGODB_CORPORA_COLLECTION] class DocumentNotFound(Exception): pass @@ -69,3 +70,34 @@ def process(self, document): and must return a dictionary with the keys to be saved in the database. """ raise NotImplementedError + +class PyPLNCorpusTask(Task): + """ + This is the base class for a Corpus task. It is very similar to + `PyPLNTask`, but it needs a corpus_id and a list of document_ids. + """ + + def run(self, corpus_id, document_ids): + """ + This method is called by Celery, and should not be overridden. + It will call the `process` method with a list of dictionaries + containing all the documents and will update de database with results. + """ + documents = document_collection.find({"_id": {"$in": document_ids}}) + if documents is None: + self.retry(exc=DocumentNotFound('Documents with ids "{}" ' + 'not found in database'.format(document_ids))) + result = self.process(documents) + corpora_collection.update({"corpus_id": corpus_id}, {"$set": result}, + upsert=True) + return corpus_id, document_ids + + def process(self, documents): + """ + This method should be implemented by subclasses. It is responsible for + performing the analysis itself. It will receive a list of dictionaries + as a paramenter (containing all the documents and the analysis that are + ready for it) and must return a dictionary with the new keys to be + saved in the corpora analysis collection. + """ + raise NotImplementedError diff --git a/pypln/backend/config.py b/pypln/backend/config.py index ec1d48e..bb708bc 100644 --- a/pypln/backend/config.py +++ b/pypln/backend/config.py @@ -31,6 +31,8 @@ def split_uris(uri): cast=split_uris) MONGODB_DBNAME = config('MONGODB_DBNAME', default='pypln') MONGODB_COLLECTION = config('MONGODB_COLLECTION', default='analysis') +MONGODB_CORPORA_COLLECTION = config('MONGODB_COLLECTION', + default='corpora_analysis') ELASTICSEARCH_CONFIG = { 'hosts': config('ELASTICSEARCH_HOSTS', diff --git a/pypln/backend/workers/__init__.py b/pypln/backend/workers/__init__.py index 0125bde..a755fc4 100644 --- a/pypln/backend/workers/__init__.py +++ b/pypln/backend/workers/__init__.py @@ -29,8 +29,9 @@ from palavras_semantic_tagger import SemanticTagger from word_cloud import WordCloud from elastic_indexer import ElasticIndexer +from corpus_freqdist import CorpusFreqDist __all__ = ['Extractor', 'Tokenizer', 'FreqDist', 'POS', 'Statistics', 'Bigrams', 'PalavrasRaw', 'Lemmatizer', 'NounPhrase', 'SemanticTagger', - 'WordCloud', 'ElasticIndexer'] + 'WordCloud', 'ElasticIndexer', 'CorpusFreqDist'] diff --git a/pypln/backend/workers/corpus_freqdist.py b/pypln/backend/workers/corpus_freqdist.py new file mode 100644 index 0000000..217aceb --- /dev/null +++ b/pypln/backend/workers/corpus_freqdist.py @@ -0,0 +1,32 @@ +# coding: utf-8 +# +# Copyright 2012 NAMD-EMAP-FGV +# +# This file is part of PyPLN. You can get more information at: http://pypln.org/. +# +# PyPLN is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# PyPLN is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with PyPLN. If not, see . +from pypln.backend.celery_task import PyPLNCorpusTask + +from collections import Counter + +class CorpusFreqDist(PyPLNCorpusTask): + + def process(self, documents): + result = Counter() + for document in documents: + d = {} + for word, count in document['freqdist']: + d[word] = count + result += Counter(d) + return {'freqdist': result.most_common()} diff --git a/tests/test_worker_corpus_freqdist.py b/tests/test_worker_corpus_freqdist.py new file mode 100644 index 0000000..1376064 --- /dev/null +++ b/tests/test_worker_corpus_freqdist.py @@ -0,0 +1,48 @@ +# coding: utf-8 +# +# Copyright 2012 NAMD-EMAP-FGV +# +# This file is part of PyPLN. You can get more information at: http://pypln.org/. +# +# PyPLN is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# PyPLN is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with PyPLN. If not, see . +from pypln.backend.workers import CorpusFreqDist +from utils import TaskTest + + +class TestCorpusFreqDistWorker(TaskTest): + def test_freqdist_should_return_a_list_of_tuples_with_frequency_distribution(self): + + freqdist_1 = [[u'is', 2], [u'the', 2], [u'blue', 1], [u'sun', 1], + [u'sky', 1], [u',', 1], [u'yellow', 1], [u'.', 1]] + + freqdist_2 = [[u'the', 2], [u'brown', 1], [u'lazy', 1], + [u'over', 1], [u'fox', 1], [u'dog', 1], [u'.', 1], + [u'quick', 1], [u'jumps', 1]] + + corpus_fd = [[u'the', 4], [u'is', 2], [u'.', 2], [u'blue', 1], + [u'brown', 1], [u'lazy', 1], [u'fox', 1], [u'jumps', 1], + [u'sun', 1], [u'dog', 1], [u'sky', 1], [u',', 1], + [u'yellow', 1], [u'quick', 1], [u'over', 1]] + + doc_id_1 = self.collection.insert({'freqdist': freqdist_1}, w=1) + doc_id_2 = self.collection.insert({'freqdist': freqdist_2}, w=1) + fake_corpus_id = 1 + + + CorpusFreqDist().delay(fake_corpus_id, [doc_id_1, doc_id_2]) + + resulting_corpus_fd = self.corpora_collection.find_one( + {'corpus_id': fake_corpus_id})['freqdist'] + + self.assertEqual(resulting_corpus_fd, corpus_fd) diff --git a/tests/utils.py b/tests/utils.py index a2168c2..a8f4845 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -34,6 +34,7 @@ def setUp(self): app.conf.update(CELERY_ALWAYS_EAGER=True) self.db = pymongo.Connection()[self.db_name] self.collection = self.db[config.MONGODB_COLLECTION] + self.corpora_collection = self.db[config.MONGODB_CORPORA_COLLECTION] def tearDown(self): self.collection.remove({}) From 71ca77dbdf9a48ed9f6cc41908b4308035a95f21 Mon Sep 17 00:00:00 2001 From: Flavio Amieiro Date: Tue, 12 Jul 2016 12:25:27 -0300 Subject: [PATCH 2/3] Fixes Mongo corpora analysis collection configuration --- pypln/backend/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypln/backend/config.py b/pypln/backend/config.py index bb708bc..e6250b2 100644 --- a/pypln/backend/config.py +++ b/pypln/backend/config.py @@ -31,7 +31,7 @@ def split_uris(uri): cast=split_uris) MONGODB_DBNAME = config('MONGODB_DBNAME', default='pypln') MONGODB_COLLECTION = config('MONGODB_COLLECTION', default='analysis') -MONGODB_CORPORA_COLLECTION = config('MONGODB_COLLECTION', +MONGODB_CORPORA_COLLECTION = config('MONGODB_CORPORA_COLLECTION', default='corpora_analysis') ELASTICSEARCH_CONFIG = { From 2be0f7ecd144f950b53d0b6c9976eee08de4ced7 Mon Sep 17 00:00:00 2001 From: Flavio Amieiro Date: Thu, 11 Aug 2016 10:15:13 -0300 Subject: [PATCH 3/3] Separates the generic PyPLNCorpusTask tests from the CorpusFreqdist tests Thanks @geron for pointing out that I was testing everything together --- tests/test_celery_corpus_task.py | 63 ++++++++++++++++++++++++++++ tests/test_worker_corpus_freqdist.py | 19 ++++----- 2 files changed, 70 insertions(+), 12 deletions(-) create mode 100644 tests/test_celery_corpus_task.py diff --git a/tests/test_celery_corpus_task.py b/tests/test_celery_corpus_task.py new file mode 100644 index 0000000..c37d22a --- /dev/null +++ b/tests/test_celery_corpus_task.py @@ -0,0 +1,63 @@ +# coding: utf-8 +# +# Copyright 2015 NAMD-EMAP-FGV +# +# This file is part of PyPLN. You can get more information at: http://pypln.org/. +# +# PyPLN is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# PyPLN is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with PyPLN. If not, see . +from pypln.backend.celery_task import PyPLNCorpusTask +from mock import MagicMock +from utils import TaskTest + + +class FakeCorpusTask(PyPLNCorpusTask): + def process(self, documents): + return {'result': sum([d["input"] for d in documents])} + +class TestCeleryCorpusTask(TaskTest): + def test_task_should_only_get_the_correct_documents(self): + # This is just preparing the expected input in the database + wrong_doc_id = self.collection.insert({'input': 999}, w=1) + correct_doc_id_1 = self.collection.insert({'input': 1}, w=1) + correct_doc_id_2 = self.collection.insert({'input': 1}, w=1) + fake_corpus_id = 1 + + FakeCorpusTask.process = MagicMock(return_value={'result': 2}) + + corpus_task = FakeCorpusTask() + + corpus_task.delay(fake_corpus_id, [correct_doc_id_1, correct_doc_id_2]) + + corpus_task.process.assert_called() + + # We need to compare the call args because it's called with a mongo + # cursor, not a list. + # We're getting [0][0] because we want the args (not kwargs) for the + # first call to the method. + call_args = list(corpus_task.process.call_args[0][0]) + for arg in call_args: + self.assertEqual(arg['input'], 1) + + def test_task_is_saving_the_result_to_mongo_with_the_corpus_id(self): + expected_result = 42 + doc_id_1 = self.collection.insert({'input': 21}, w=1) + doc_id_2 = self.collection.insert({'input': 21}, w=1) + fake_corpus_id = 1 + + FakeCorpusTask().delay(fake_corpus_id, [doc_id_1, doc_id_2]) + + resulting_corpus_analysis = self.corpora_collection.find_one( + {'corpus_id': fake_corpus_id})['result'] + + self.assertEqual(resulting_corpus_analysis, expected_result) diff --git a/tests/test_worker_corpus_freqdist.py b/tests/test_worker_corpus_freqdist.py index 1376064..68fc0c9 100644 --- a/tests/test_worker_corpus_freqdist.py +++ b/tests/test_worker_corpus_freqdist.py @@ -30,19 +30,14 @@ def test_freqdist_should_return_a_list_of_tuples_with_frequency_distribution(sel [u'over', 1], [u'fox', 1], [u'dog', 1], [u'.', 1], [u'quick', 1], [u'jumps', 1]] - corpus_fd = [[u'the', 4], [u'is', 2], [u'.', 2], [u'blue', 1], - [u'brown', 1], [u'lazy', 1], [u'fox', 1], [u'jumps', 1], - [u'sun', 1], [u'dog', 1], [u'sky', 1], [u',', 1], - [u'yellow', 1], [u'quick', 1], [u'over', 1]] + corpus_fd = [(u'the', 4), (u'is', 2), (u'.', 2), (u'blue', 1), + (u'brown', 1), (u'lazy', 1), (u'fox', 1), (u'jumps', 1), + (u'sun', 1), (u'dog', 1), (u'sky', 1), (u',', 1), + (u'yellow', 1), (u'quick', 1), (u'over', 1)] - doc_id_1 = self.collection.insert({'freqdist': freqdist_1}, w=1) - doc_id_2 = self.collection.insert({'freqdist': freqdist_2}, w=1) - fake_corpus_id = 1 + result = CorpusFreqDist().process([{'freqdist': freqdist_1}, + {'freqdist': freqdist_2}]) - - CorpusFreqDist().delay(fake_corpus_id, [doc_id_1, doc_id_2]) - - resulting_corpus_fd = self.corpora_collection.find_one( - {'corpus_id': fake_corpus_id})['freqdist'] + resulting_corpus_fd = result['freqdist'] self.assertEqual(resulting_corpus_fd, corpus_fd)