-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathChatTracker.java
More file actions
95 lines (83 loc) · 4.26 KB
/
ChatTracker.java
File metadata and controls
95 lines (83 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// milad teimouri 95725127
// run : .\flink.bat run E:\courses\master\term2\DistributedSystem\CEP\Windows\flink\target\flink-1.0-SNAPSHOT.jar
// .\nc.exe -l -p 9000
// Get-Content flink-milad72t-jobmanager-MILADD.out
package flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
import javax.xml.stream.FactoryConfigurationError;
public class ChatTracker {
public static void main (String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> chatPatternStream = env
.socketTextStream("localhost", 9000)
.map(new ChatSplitter()) // convert string format to tuple
.keyBy(0) // key by sender and receiver of message (Sender->Receiver)
// .countWindow(3,1)
.flatMap(new PatternTracker()); // check every 3 message by same sender and receiver for detect specifc pattern
chatPatternStream.print();
env.execute("CEP chat tracker By Milad");
}
public static class ChatSplitter implements
MapFunction<String, Tuple2<String,String>> {
public Tuple2<String,String> map(String sentence)
throws Exception {
String string = sentence;
String[] parts = string.split("#");
if (parts.length == 2){
return Tuple2.of( parts[0], parts[1]);
}
return Tuple2.of( "Unkwon", "message");
}
}
public static class PatternTracker extends RichFlatMapFunction<Tuple2<String,String>,String>{
private transient ValueState<Integer> FlagPattern; // flag for maintain of state
public void flatMap(Tuple2<String , String> input , Collector<String> out)
throws Exception {
Integer State = FlagPattern.value(); // current state
if (input.f1.equals("salam")){
FlagPattern.update(1);
out.collect(String.format("%s , message : %s",input.f0,input.f1));
}
else if(input.f1.equals("khoubi") && State==1){
FlagPattern.update(2);
out.collect(String.format("%s , message : %s",input.f0,input.f1));
}
else if(input.f1.equals("chetouri") && State==2){
FlagPattern.update(0);
out.collect(String.format("Warning!! pattern detect between %s !!",input.f0));
}
else{
FlagPattern.update(0);
out.collect(String.format("%s , message : %s",input.f0,input.f1));
}
}
public void open(Configuration config){
ValueStateDescriptor<Integer> descriptor=
new ValueStateDescriptor<Integer>(
"FlagPatternDetector" ,
TypeInformation.of(new TypeHint<Integer>() {}),
Integer.valueOf(0) // first initiate of flag
);
FlagPattern = getRuntimeContext().getState(descriptor);
}
}
}