-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_processing_worker.py
More file actions
80 lines (74 loc) · 2.5 KB
/
file_processing_worker.py
File metadata and controls
80 lines (74 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import threading
import asyncio
from zipfile import ZipFile
from zipfile import ZipInfo
import json
import serviceConstants as const
from botocore.exceptions import ClientError
import os
from queue import Queue
BUCKET_NAME = os.environ[const.BUCKET_NAME_KEY]
MAX_PROCESSING_BATCH_SIZE = 10
class file_processing_worker (threading.Thread):
def __init__(
self,
threadID,
name,
queue: Queue,
zip: ZipFile,
s3
):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.zip = zip
self.s3 = s3
self.queue = queue
def run(self):
loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.waitForProcessingCompletion(loop))
loop.close()
print("%s is finished processing!" % self.name)
async def waitForProcessingCompletion(
self,
loop: asyncio.AbstractEventLoop) -> None:
i = 0
while not self.queue.empty():
tasks: list[asyncio.Task] = []
j = 0
while j in range(MAX_PROCESSING_BATCH_SIZE) and not self.queue.empty():
tasks.append(loop.create_task(self.processFactsFile(
self.queue.get()
)))
i += 1
j += 1
await asyncio.wait(tasks)
async def processFactsFile(
self,
file: ZipInfo) -> None:
fileAdded = False
cik = file.filename[:-5]
try:
temp = json.loads(self.zip.read(file))
try:
data = json.loads(self.s3.Bucket(BUCKET_NAME).Object(file.filename).get()['Body'].read().decode())
if (
temp != data
):
print("%s: Updating %s..." % (self.name, cik))
fileAdded = True
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
print("%s: Adding %s..." % (self.name, cik))
fileAdded = True
else:
print(ex)
except json.decoder.JSONDecodeError:
print("%s: Cannot process %s" % (self.name, cik))
if (fileAdded):
fileAdded = False
object = self.s3.Object(BUCKET_NAME, file.filename)
object.put(Body=json.dumps(temp))
else:
print("%s: %s is up to date!" % (self.name, cik))