-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathexamples_element_wrapper.py
More file actions
1985 lines (1647 loc) · 63.2 KB
/
examples_element_wrapper.py
File metadata and controls
1985 lines (1647 loc) · 63.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""This module contains examples of the 'element' wrapper.
A function that takes a single element of an input stream
and generates a single element of an output stream is an
example of a function that is wrapped by the 'element'
wrapper to create a function on streams.
The module has the following parts:
(1) op or simple operator:
single input, single output
(2) sink:
single input, no outputs
(3) source:
no input, multiple outputs
(4) split:
single input, multiple output
(5) merge:
multiple input, single output
(6) general case:
multiple input, multiple output
All of the first 5 cases are specializations of
the general case; however, the syntactic sugar they
provide can be helpful.
For each of the above cases we first consider agents
that are stateless and then consider agents with state.
"""
## Commenting this section that appends everything on the path
## if __name__ == '__main__':
## if __package__ is None:
## import sys
## from os import path
## sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) )
from Stream import Stream, _no_value, _multivalue
from Operators import stream_func, stream_agent
import json
import numpy as np
#######################################################
# PART 1
# SINGLE INPUT STREAM, SINGLE OUTPUT STREAM.
#######################################################
#______________________________________________________
# PART 1A: Stateless
#______________________________________________________
# Single input, single output, stateless functions
# Inputs to the functions:
# element : object
# element of the input stream
# Returned by the functions:
# element : object
# element to be placed on the output stream.
#______________________________________________________
# EXAMPLE 1
#
# SPECIFICATION:
# Write a function, square_stream, that has a single
# input stream and that returns a stream whose elements
# are the squares of the elements of its input
# stream.
# If y = square_stream(x)
# and x is a stream with initial values [1, 3, 5, ...] then
# y must be a stream with initial values [1, 9, 25, ...]
# See main()
#
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function that returns squares of
# its single input value.
def square(v): return v*v
# Second step:
# Wrap the above function, square, using wrapping
# function stream_func to obtain the desired
# function square_stream.
def square_stream(stream):
return stream_func(
inputs=stream, f_type='element', f=square, num_outputs=1)
# stream_func is the wrapper.
# f_type specifies how the function f is to be wrapped.
# The initial examples have f_type='element' which
# assumes that f operates on single elements of input streams
# and produces single elements of output streams.
# num_outputs is the number of output streams. We begin
# with examples in which num_outputs=1.
# inputs is set to the parameter of square_stream, and
# so inputs=stream.
# In this initial set of examples, the function has
# a single input stream. So inputs is the parameter
# You can also obtain a stream y1 that squares elements of
# a stream x using stream_func directly, without defining
# the function square_stream first:
# y1 = stream_func(
# inputs=x, f_type='element', f=square, num_outputs=1)
# See main()
#
# EXAMPLE 2
#
# SPECIFICATION:
# Write a function, double_stream, that has a single
# input stream and that returns a stream whose elements
# are twice the values of the elements of its input
# stream.
#
# If z = double_stream(x)
# and x is a stream with initial values [1, 3, 5, ...]
# then z must be a stream with initial values [2, 6, 10, ...]
# See main()
#
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function double that returns the double of
# its single input value.
def double(v): return 2*v
# Second step:
# Wrap the above function, double, to obtain
# the desired function double_stream.
def double_stream(stream):
return stream_func(
inputs=stream, f_type='element', f=double, num_outputs=1)
# We could also have obtained the desired stream z1
# using stream_func directly,
# z1 = stream_func(
# inputs=x, f_type='element', f=double, num_outputs=1)
# EXAMPLE 3
# Example of function composition.
# Generate a stream w1 that doubles the squares of the elements
# of stream x
# w1 = double_stream(square_stream(x))
# If x is a stream [1, 3, 5, ...] then
# w1 is a stream [2, 18, 50, ...]
#
# Generate a stream w2 that squares twice the elements
# of stream x
# w2 = square_stream(double_stream(x))
# If x is a stream [1, 3, 5, ...] then
# w2 is a stream [4, 36, 100, ...]
# EXAMPLE 4
# Illustrating use of _no_value.
# SPECIFICATION:
# Write a function, discard_odds, that has a single
# input stream and that returns a single stream whose elements
# are the same as its input stream except that odd
# numbers are discarded.
#
# If v = discard_odds(x)
# and x is a stream [1, 2, 3, 4, 5, 6, ...] then
# v must be the stream [2, 4, 6,.....]
#
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function with a single input that returns
# its single input value if the input value is even
# and returns _no_value otherwise.
def even(v):
if not v%2: return v
else: return _no_value
#
# _no_value is a special object that is not placed
# in an output stream.
#
# Second step:
# Wrap the above function, even, to obtain
# the desired function discard_odds.
def discard_odds(stream):
return stream_func(
inputs=stream, f_type='element', f=even, num_outputs=1)
# Explanation of need for _no_value
#
# Consider the following example that returns None rather than
# _no_value for odd numbers.
def even_1(v):
if not v%2: return v
else: return None
def discard_odds_1(stream):
return stream_func(
inputs=stream, f_type='element', f=even_1, num_outputs=1)
#
# If v_1 = discard_odds_1(x) and
# x is a stream [1, 2, 3, 4, ...] then
# v_1 is a stream[None, 2 None, 4, ...] which is not the
# same as [2, 4, ...]
# EXAMPLE 5
# Illustrating use of _multivalue.
# If Python function f returns _multivalue(l)
# where l is a list then the agent appends each
# element of l to the agent's output stream.
# For example, if f returns _multivalue([3, 4])
# then 3 and then 4 will be appended to the
# agent's output stream.
# Note that if f returns [3, 4] then the list
# [3, 4] will be appended to the agent's output
# stream; this is not the same as appending 3 and
# then 4.
# SPECIFICATION:
# Write a function evens_and_halves with a single
# input stream that returns a single stream in
# which odd values in the input stream are discarded
# and even values and half their values appear in
# the output stream.
#
# If u = evens_and_halves(x) and
# x is a stream [1, 2, 3, 4, 5, 6, ..] then
# u must be the stream [2, 1, 4, 2, 6, 3, .....]
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function with a single input v and that
# returns _multivalue([v, v/2] if v is even
# and returns _no_value otherwise.
def even2(v):
if not v%2: return _multivalue([v, v/2])
else: return _no_value
# Second step:
# Wrap the above function, even2, to obtain
# the desired function evens_and_halves.
def evens_and_halves(stream):
return stream_func(
inputs=stream, f_type='element', f=even2, num_outputs=1)
# Illustration of the need for _multivalue
# As a contrast to even2 consider the following:
def even3(v):
if not v%2: return [v, v/2]
else: return _no_value
def evens_and_halves_3(stream):
return stream_func(
inputs=stream, f_type='element', f=even3, num_outputs=1)
# If t = evens_and_halves_3(x)
# and x is a stream [1, 2, 3, 4, 5, 6, ..] then
# t is the stream [[2, 1], [4, 2], [6, 3], .....] which is
# different from [2, 1, 4, 2, 6, 3, ...]
# EXAMPLE 6
# Illustrating use of local variables.
# SPECIFICATION:
# Write a function multiply_elements_in_stream
# with two input parameters:
# (1) stream: a stream of numbers and
# (2) multiplier: a number
# The function returns a single stream whose
# elements are multiplier times the corresponding
# elements of the input stream.
# If s = multiply_elements_in_stream(stream=x, multiplier=3)
# and x is a stream [1, 2, 3, 4, ...] then
# s must be the stream [3, 6, 9, 12,...]
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function mult(v) that returns multiplier times
# v, where multiplier is a constant specified outside
# mult.
# def mult(v): return multiplier*v
# Second step:
# Wrap the function, mult, to obtain
# the desired function multiply_elements_in_stream.
# Define mult inside the definition of
# multiply_elements_in_stream so that the parameter
# multiplier is available to function mult.
def multiply_elements_in_stream(stream, multiplier):
def mult(v): return multiplier*v
return stream_func(
inputs=stream, f_type='element', f=mult, num_outputs=1)
# EXAMPLE 7
# Another example illustrating use of local variables.
# SPECIFICATION:
# Write a function boolean_of_values_greater_than_threshold
# with two input parameters:
# (1) stream: a stream of numbers and
# (2) threshold: a number
# The function returns a single stream whose
# elements are True if the corresponding
# elements of the input stream exceed threshold and
# are False otherwise.
# If
# r = boolean_of_values_greater_than_threshold(stream=x, threshold=4)
# and x is a stream [1, 20, 31, 4, ...] then
# r must be the stream [False, True, True, False,...]
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function value_greater_than_threshold(value)
# that returns True if value exceed threshold
# where threshold is a constant specified outside
# value_greater_than_threshold.
# def value_greater_than_threshold(value):
# return value > threshold
# Second step:
# Wrap the function, value_greater_than_threshold, to
# obtain the desired function,
# boolean_of_values_greater_than_threshold.
# Define value_greater_than_threshold inside the definition
# of boolean_of_values_greater_than_threshold so that the
# parameter threshold is available to function
# boolean_of_values_greater_than_threshold.
def boolean_of_values_greater_than_threshold(stream, threshold):
def value_greater_than_threshold(value):
return value > threshold
return stream_func(
inputs=stream, f_type='element',
f=value_greater_than_threshold, num_outputs=1)
#______________________________________________________
# PART 1B
#______________________________________________________
# Single input, single output, stateful functions
# Inputs to the functions:
# element : object
# element of the input stream
# state : state of the agent before the transition
# Returned by the functions:
# element : object
# element to be placed on the output stream.
# state : object
# The next state of the agent.
#
# The form of the wrapper in these examples is:
# stream_func(
# inputs=stream, # The name of the input stream
# f_type='element',
# f=g, # The name of the function that is wrapped
# num_outputs=1, # The number of outputs
# state=initial_state # specifies the initial state
# )
#______________________________________________________
# EXAMPLE 1
# An example illustrating state.
# SPECIFICATION:
# Write a function cumulative_stream with a single
# input stream and that returns a single stream where
# the j-th element of the output stream is the sum
# of the first j elements of its input stream.
# If b = cumulative_stream(stream=x)
# and x is a stream [1, 2, 3, 4, ...] then
# b must be the stream [1, 3, 6, 10, ....]
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function cumulative_sum given below.
def cumulative_sum(v, cumulative):
"""
Parameters
----------
v : number
The next element of the input stream of the
agent.
cumulative: number
The state of the agent. The state is the sum
of all the values received on the agent's
input stream.
Returns
-------
(cumulative, cumulative)
cumulative : number
The state after the transition, i.e., the
sum of values received on the agent's input
stream including the value received in this
transition.
"""
cumulative += v
return (cumulative, cumulative)
# Second step:
# Wrap the function, cumulative_sum, to
# obtain the desired function, cumulative_stream.
# Since the function has state, the wrapper specifies
# the initial state: state=0.
def cumulative_stream(stream):
return stream_func(
inputs=stream,
f_type='element',
f=cumulative_sum,
num_outputs=1,
state=0 # The initial state
)
# EXAMPLE 2
# Another example illustrating state.
# SPECIFICATION:
# Write a function average_stream that has a
# single input stream and returns a stream where
# the j-th element of the output stream is the average
# of the first j elements of its input stream.
# If c = cumulative_stream(stream=x)
# and x is a stream [1, 2, 3, 4, ...] then
# c must be the stream [1.0, 1.5, 2.0, 2.5, ....]
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function average given below.
def average(v, state):
"""
Parameters
----------
v : number
The next element of the input stream of the
agent.
state: (n, cumulative)
The state of the agent where
n : number
The value of the next element in the agent's
input stream.
cumulative : number
The sum of the values that the agent has
received on its input stream.
Returns
-------
(mean, state)
mean : floating point number
The average of the values received so far by
the agent
state : (n, cumulative)
The new state of the agent.
"""
n, cumulative = state
n += 1
cumulative += v
mean = cumulative/float(n)
state = (n, cumulative)
return (mean, state)
# Second step:
# Wrap the function, average, to
# obtain the desired function, average_stream.
# Since this function has a state, the wrapper
# specifies the initial state: state=(0,0.0).
def average_stream(stream):
return stream_func(
inputs=stream,
f_type='element',
f=average,
num_outputs=1,
state=(0, 0.0) # The initial state
# Initially n = 0, cumulative = 0.0
)
#######################################################
# PART 2: SINKS
# ONE OR MORE INPUT STREAM, NO OUTPUT STREAMS.
#######################################################
#______________________________________________________
# PART 2A: Stateless
# Single input, no output, stateless functions
# Inputs to the functions:
# element : object
# element of the input stream
# Returned by the functions:
# None
#
# The form of the wrapper in these examples is:
# stream_func(
# inputs=stream, # The name of the input stream
# f_type='element',
# f=g, # The name of the function that is wrapped
# num_outputs=0 # The number of outputs
# )
#______________________________________________________
# EXAMPLE 1
# SPECIFICATION:
# Write a function print0 that has a single input
# stream and that returns None. The function
# prints the values of its input stream.
# If the input stream has initial values [3, 5, ...]
# and the stream name is 'x' then the function should
# print:
# x : 3
# x : 5
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function p0 with a single parameter, v, where
# p0 prints the name of a stream (specified outside p0)
# and the value of v.
# def p0(v):
# print '{0} : {1}'.format(stream.name, v)
# Second step:
# Wrap the function, p0, to obtain the desired function,
# print0. Define p0 inside the definition
# of print0 so that p0 can access the parameter,
# stream of print0.
# This function doesn't have to keep track of its past,
# and so it has no state. Therefore, the wrapper
# stream_func does not specify a state.
def print0(stream):
def p0(v):
print '{0} : {1}'.format(stream.name, v)
return stream_func(
inputs=stream, f_type='element',
f=p0, num_outputs=0)
#______________________________________________________
# PART 2A: Stateful
# Single input, no output, with state
# EXAMPLE 2
# Illustrates state.
# SPECIFICATION:
# Write a function print_stream that has a single input
# stream and that returns None. The function
# prints the values with the indexes of its input stream.
# If the input stream has initial values [3, 5, ...]
# and the stream name is 'x' then the function should
# print:
# x[0] = 3
# x[1] = 5
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# The function has to keep track of the number of values
# that have already been printed; so we need a state.
# Write a function print_element with two parameters, v,
# the value in a stream, and count --- the number of elements
# printed so far. The function returns the new state, i.e.,
# the new value of count. The function uses a variable stream
# specified outside the function.
#def print_element(v, count):
# print '{0}[{1}] = {2}'.format(stream.name, count, v)
# return (count+1)
# Second step:
# Wrap the function, print_element, to obtain the desired function,
# print_stream. Define print_element inside the definition
# of print_stream so that print_element can access the parameter,
# stream of print_stream.
# This function has a state, namely count, and so the wrapper
# specifies its initial value: 'state=0'.
def print_stream(stream):
def print_element(v, count):
print '{0}[{1}] = {2}'.format(stream.name, count, v)
return (count+1)
return stream_func(
inputs=stream, f_type='element',
f=print_element, num_outputs=0,
state=0)
# EXAMPLE 3
# SPECIFICATION:
# Write a function stream_to_file that has two
# parameters, a single input stream and a filename.
# The function returns None. The function
# appends the json representations of the values
# of its input stream into the file with name filename.
# If the input stream has initial values [3, 5, ...]
# and file is initially empty then the file should
# have the json representations of 3 and 5, each on a
# separate line.
# (Note that if the file is not initially empty, then
# the stream values are appended to the end of the
# nonempty file.)
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function write_value_to_file(v) that
# appends the json representation of v to the
# file with name filename with each append on
# a new line.
# Second step:
# Wrap the function, write_value_to_file, to
# obtain the desired function, stream_to_file.
# Define write_value_to_file inside the definition
# of stream_to_file so that stream_to_file can
# access the parameter, filename.
def stream_to_file(stream, filename):
def write_value_to_file(v):
with open(filename, 'a') as input_file:
input_file.write(json.dumps(v) + '\n')
return stream_func(
inputs=stream, f_type='element',
f=write_value_to_file, num_outputs=0)
# EXAMPLE 4
# Illustrates synchronous merge of multiple inputs
# in a sink (i.e., no output).
# SPECIFICATION:
# Write a function, print_sums, with a single parameter
# which is a list of streams. The function returns None.
# The function prints the sum of the k-th elements of
# all its input streams for all k.
def print_sums(list_of_streams):
def print_sum_of_list(list_of_elements):
print 'sum of', list_of_elements, '=', sum(list_of_elements)
return stream_func(inputs=list_of_streams,
f_type='element',
f=print_sum_of_list,
num_outputs=0)
# EXAMPLE 5
# For an illustration of a sink with asynchronous merge
# of its inputs see Section 7.
#######################################################
# PART 3: SOURCES
# NO INPUT STREAMS, ONE OR MORE OUTPUT STREAMS
#######################################################
#______________________________________________________
# PART 3A: Stateless
# No inputs, one or more outputs, stateless functions
# Illustrates the use of call_streams and illustrates
# that append and extend of a stream are analogous to
# the same operations on lists.
#______________________________________________________
# EXAMPLE 1
# SPECIFICATION:
# Write a function, timer, with three parameters:
# output_stream, num_outputs and time_period where
# stream is a Stream, num_outputs is a positive integer
# and time_period is a positive number.
#
# The function generates a stream consisting
# of the values [0, 1,..., num_outputs-1]. An integer
# is output to the stream every time_period seconds.
# THE STREAMING PROGRAM.
# Illustrates that stream.append(i) is analogous
# to l.append(i) where stream is a Stream and l
# is a list.
import time
def timer(output_stream, num_outputs, time_period):
"""
Parameters
----------
stream: Stream
num_outputs: int, positive
time_period: int or float, positive
"""
for i in range(num_outputs):
output_stream.append(i)
time.sleep(time_period)
# EXAMPLE 2
# SPECIFICATION:
# Write a function, rand, with three parameters:
# output_stream, num_outputs, time_period.
# where output_stream is a stream, num_outputs is
# a nonnegative number and time_period is an
# optional positive number.
# The function generates a stream of
# num_outputs random numbers. If time_period
# is provided, a random number is appended
# to the stream periodically with the period
# time_period. If time_period is not provided
# random numbers are appended to the stream
# continuously.
# THE STREAMING PROGRAM.
import time
import random
def rand(output_stream, num_outputs, time_period=0):
"""
Parameters
----------
output_stream: Stream
num_outputs: int, positive
time_period: int or float, positive
"""
if not time_period:
for _ in range(num_outputs):
output_stream.append(random.random())
else:
for _ in range(num_outputs):
output_stream.append(random.random())
time.sleep(time_period)
# EXAMPLE 3
# SPECIFICATION:
# Write a function, file_to_stream, with
# three parameters: filename, output_stream
# and time_period (optional)
# The function reads a file called filename.
# The file has json representations of objects,
# with one or more representations per line.
# The function appends the objects in the file,
# to the stream. Objects from one line of the file
# are appended the stream every time_period seconds
# if time_period is specified. If time_period
# is not specified, the function appends objects
# from the file to the stream continuously.
# THE STREAMING PROGRAM.
import time
def file_to_stream(filename, output_stream, time_period=0):
"""
Parameters
----------
filename: str
output_stream: Stream
time_period: int or float, nonnegative
"""
with open(filename, 'r') as output_file:
for line in output_file:
values = [json.loads(v) for v in line.split()]
output_stream.extend(values)
if time_period:
time.sleep(time_period)
# EXAMPLE 4
# Illustrates the use of call_streams
# SPECIFICATION:
# Write a function, single_stream_of_random_numbers,
# that returns a single stream of random numbers.
# A random number is appended to the output stream
# when the parameter timer_stream is modified.
# Note that in the wrapper, stream_func, the
# parameter call_streams is a LIST of streams, and
# so, the correct code is:
# call_streams=[timer_stream]
# not:
# call_streams=timer_stream
# THE STREAMING PROGRAM.
def single_stream_of_random_numbers(timer_stream):
return stream_func(
inputs=None,
f_type='element',
f=random.random,
num_outputs=1,
call_streams=[timer_stream])
#######################################################
# PART 4: SPLIT
# SINGLE INPUT STREAM, TWO OR MORE OUTPUT STREAMS.
#######################################################
#______________________________________________________
# PART 4A: Stateless
# Single input, two or more outputs, stateless functions
# A python function with a single input and a tuple of
# outputs is wrapped to produce a function with a single
# input stream and a list of output streams
#______________________________________________________
# EXAMPLE 1
# SPECIFICATION:
# Write a function, square_and_double_stream, with a
# single parameter: an input stream. The function returns
# a list of two streams where the elements of the first
# output stream are squares of the elements of the input
# stream and the elements of the second output stream are
# twice those of the input stream. If the input is the
# stream [0, 1, 2, 3, ...] then the function returns a list
# of two streams the first of which is [0, 1, 4, 9, ...]
# and the second is [0, 2, 4, 6, ..]
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function, square_and_double with a single parameter,
# a number. The function returns a tuple of two values, the
# square and double of the input.
def square_and_double(m):
return (m*m, 2*m)
# Second step:
# Wrap the function, square_and_double, to
# obtain the desired function, square_and_double_stream.
def square_and_double_stream(stream):
return stream_func(
inputs=stream,
f_type='element',
f=square_and_double,
num_outputs=2 #Two output streams
)
# EXAMPLE 2
# SPECIFICATION:
# Write a function, exp_mult_div_stream, with four
# parameters: stream, exponent, multiplier, and
# divisor where the last three parameters are numbers.
# The function returns a list of three streams where
# the elements of the streams are the elements of the
# input stream raised to exponent, multiplied by
# multiplier and divided by divisor, respectively.
# If the input stream is [0, 1, 2, 3, ...] and exponent
# is 3, multiplier is 10, and divisor is 0.25 then the
# function returns a list of three streams:
# [0, 1, 8, 27, ...], [0, 10, 20, 30, ...], and
# [0, 4, 8, 12, ...]
# HOW TO DEVELOP THE STREAMING PROGRAM.
# Wrap the function (see below) exp_mult_div_number
def exp_mult_div_stream(stream, exponent, multiplier, divisor):
def exp_mult_div_number(n):
return [n**exponent, n*multiplier, n/divisor]
return stream_func(inputs=stream,
f_type='element',
f=exp_mult_div_number,
num_outputs=3 # Returns list of 3 streams.
)
# EXAMPLE 3
# Illustrates use of _no_value
# SPECIFICATION:
# Write a function, even_odd_stream, with one parameter: stream.
# The function returns a list of two streams, the first containing
# the even values of the input stream, and the second containing
# the odd values.
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# Write a function, even_odd with a single parameter,
# a number. The function returns a tuple with 2 values,
# that will be inserted into the two output streams of the
# wrapped function. even_odd returns (_no_value, m) if m
# is even, because the odd stream gets no value and the even
# stream gets m. Symmetrically, even_odd returns (m, _no_value)
# if m is odd.
def even_odd(m):
if m%2: return [_no_value, m]
else: return [m, _no_value]
# Second step:
# Wrap the function, even_odd, to get the desired function.
def even_odd_stream(stream):
return stream_func(inputs=stream,
f_type='element',
f=even_odd,
num_outputs=2 # Returns list of 2 streams.
)
#______________________________________________________
# PART 4B: Stateful
# Single input, two or more outputs, stateful functions
# Write a python function with two inputs --- a stream element
# and a state --- and that returns a tuple of stream
# elements and the next state. Wrap the function to produce a
# function with a single input stream and a list of output
# streams.
#______________________________________________________
# EXAMPLE 1
# SPECIFICATION:
# Write a function that has a single input stream where the
# elements of the stream are tuples:
# (sensor name, time, sensor reading)
# The sensor name is either 'temperature' or 'humidity'
# The time is a positive integer. The sensor reading is
# a number where the temperature reading is greater than -274,
# and the humidity reading is non-negative.
# The function returns two output streams, one for temperature
# and one for humidity. If the temperature input stream has
# a value that is less than DELTA away from its previous output,
# where DELTA is a constant parameter, then that value is not
# placed on the output stream; if the value exceeds DELTA then
# it is placed on the output stream. Similarly for humidity data.
# The parameters of the function are the input stream and DELTA.
# The function returns a list of two output streams, one for
# temperature and the other for humidity.
# HOW TO DEVELOP THE STREAMING PROGRAM.
# First step:
# The state of the agent is the last value output on each of
# the temperature and humidity streams. The state is represented
# by a tuple of 2 numbers.
# Write a function t_and_h with two parameters:
# (1) msg: a 3-tuple (sensor_name, time, reading), and
# (2) a state (last_temperature, last_humidity)
# The function returns:
# (1) A 2-tuple representing the next outputs on the temperature
# and humidity streams, and
# (2) the next state.
# The function reads a constant DELTA defined outside the function.
# Second step:
# Wrap the function, t_and_h, to get the desired function.
# Set the initial state to be a 2-tuple.
def temperature_and_humidity_streams(stream, DELTA):
def t_and_h(msg, state):
sensor_name, time, reading = msg
index = 0 if sensor_name == 'temperature' else 1
next_output = [_no_value, _no_value]
next_state = state
if abs(state[index] - reading) > DELTA:
next_output[index] = msg
state[index] = reading
return (next_output, next_state)
return stream_func(inputs=stream,
f_type='element',