Currently workers can only process documents -- broker gets the document information from MongoDB and pass it to worker's wrapper (then it calls the worker's main function, passing the document as a parameter).
With the current approach we can't create worker for, for example, create a corpus wordcloud analysis (or any analysis that needs to process an entire corpus instead of a document).
We could just change a little the code so broker can get an entire corpus from MongoDB and pass it to worker's wrapper, but there is a problem with this simple approach: a corpus is much larger than a document (since it is a collection of documents, each of them with its own analysis) and is not a good idea to pass an entire corpus from broker process to worker process (multiprocessing uses pickle for this job, with temporary files to save the pickled objects).
So, the best way to do it is getting data from MongoDB inside worker process, but it is not good to provide MongoDB access to the worker. Then, I think we need a solution like this:
- Broker should pass MongoDB access information to
workers.wrapper when the worker needs to work on a corpus (key from = 'document' in worker's __meta__).
workers.wrapper should connect to MongoDB, get the entire corpus in a lazy-way and pass this lazy-object to worker's main function.
workers.wrapper should also pass corpus-specific information, like it does for documents (for example, to worker know the results of previous analysis, as in worker freqdist: we need the key tokens that is the output of worker tokenizer).
There is a problem when we permit workers to do corpus analysis: if a corpus change (document added, modified or deleted), we need to re-run all the analysis. We must create a way to re-schedule the corpus pipeline when a job of addition/modification/deletion of a document from that corpus finish (probably we'll need a heuristic to do not schedule 100 corpus pipelines (for the same corpus) when we add 100 documents to the corpus).
Note: maybe a map-reduce approach should be better, for example: passing each document for a worker.map function and then all the resulting information to worker.reduce.
Currently workers can only process documents -- broker gets the document information from MongoDB and pass it to worker's wrapper (then it calls the worker's
mainfunction, passing the document as a parameter).With the current approach we can't create worker for, for example, create a corpus wordcloud analysis (or any analysis that needs to process an entire corpus instead of a document).
We could just change a little the code so broker can get an entire corpus from MongoDB and pass it to worker's wrapper, but there is a problem with this simple approach: a corpus is much larger than a document (since it is a collection of documents, each of them with its own analysis) and is not a good idea to pass an entire corpus from broker process to worker process (
multiprocessingusespicklefor this job, with temporary files to save the pickled objects).So, the best way to do it is getting data from MongoDB inside worker process, but it is not good to provide MongoDB access to the worker. Then, I think we need a solution like this:
workers.wrapperwhen the worker needs to work on a corpus (keyfrom='document'in worker's__meta__).workers.wrappershould connect to MongoDB, get the entire corpus in a lazy-way and pass this lazy-object to worker'smainfunction.workers.wrappershould also pass corpus-specific information, like it does for documents (for example, to worker know the results of previous analysis, as in workerfreqdist: we need the keytokensthat is the output of workertokenizer).There is a problem when we permit workers to do corpus analysis: if a corpus change (document added, modified or deleted), we need to re-run all the analysis. We must create a way to re-schedule the corpus pipeline when a job of addition/modification/deletion of a document from that corpus finish (probably we'll need a heuristic to do not schedule 100 corpus pipelines (for the same corpus) when we add 100 documents to the corpus).
Note: maybe a map-reduce approach should be better, for example: passing each document for a
worker.mapfunction and then all the resulting information toworker.reduce.