-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathexample_counting_streams_PyProbables.py
More file actions
70 lines (62 loc) · 2.43 KB
/
example_counting_streams_PyProbables.py
File metadata and controls
70 lines (62 loc) · 2.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""
This code uses PyProbables:
https://pyprobables.readthedocs.io/en/latest/index.html
Install PyProbables to use this code.
This code is a straightforward application of the HeavyHitters class in
PyProbables. The heavy hitters (estimates of most frequent elements) in the
input stream are output.
"""
from probables import HeavyHitters
def f(element, heavy_hitters_object):
"""
Parameters
----------
element: str
An element of in_stream is the string version of a
method call to a heavy_hitters object. For example
'add' for the method add, and 'heavy_hitters' for
the method heavy_hitters.
heavy_hitters_object: HeavyHitters
An instance of HeavyHitters.
"""
if element == 'heavy_hitters':
print ('heavy hitters')
print (heavy_hitters_object.heavy_hitters)
else:
# element must be ('add', object)
function_name, obj = element
if function_name == 'add':
heavy_hitters_object.add(obj)
else:
raise ValueError
#---------------------------------------------------------------------
# TESTS
#---------------------------------------------------------------------
def test_heavy_hitters_stream():
from stream import Stream, run
from example_operators import single_item
heavy_hitters_object = HeavyHitters(num_hitters=2, width=2, depth=2)
# Declare streams
x = Stream('input_stream')
# Create agents
single_item(in_stream=x, func=f,
heavy_hitters_object=heavy_hitters_object)
# Put data into stream and run
x.extend([('add', 'a'), ('add', 'a'), ('add', 'a'), ('add', 'b'),
('add', 'c'), ('add', 'c'), ('add', 'c'),
('heavy_hitters'),
('add', 'a'), ('add', 'b'), ('add', 'c'), ('add', 'a'),
('heavy_hitters'),
('add', 'b'), ('add', 'c'), ('add', 'b'), ('add', 'b', ),
('heavy_hitters'),
('add', 'b'), ('add', 'b'), ('add', 'b'), ('add', 'b',),
('add', 'b'), ('add', 'b'), ('add', 'b'), ('add', 'b', ),
('heavy_hitters'),
('add', 'd'), ('add', 'd'), ('add', 'd'), ('add', 'd'),
('add', 'd'), ('add', 'd'), ('add', 'd'), ('add', 'd'),
('add', 'd'), ('add', 'd'), ('add', 'd'), ('add', 'd'),
('heavy_hitters')
])
run()
if __name__ == '__main__':
test_heavy_hitters_stream()