diff --git a/.circleci/config.yml b/.circleci/config.yml
new file mode 100644
index 0000000..44ee543
--- /dev/null
+++ b/.circleci/config.yml
@@ -0,0 +1,55 @@
+# Java Maven CircleCI 2.0 configuration file
+#
+# Check https://circleci.com/docs/2.0/language-java/ for more details
+#
+# Continuous Integration!
+version: 2
+jobs:
+ build:
+ docker:
+ # specify the version you desire here
+ - image: circleci/openjdk:8-jdk
+
+ # Specify service dependencies here if necessary
+ # CircleCI maintains a library of pre-built images
+ # documented at https://circleci.com/docs/2.0/circleci-images/
+ # - image: circleci/postgres:9.4
+
+ working_directory: ~/repo
+
+ environment:
+ # Customize the JVM maximum heap limit
+ MAVEN_OPTS: -Xmx3200m
+
+ steps:
+ - checkout
+
+ # Download and cache dependencies
+ - restore_cache:
+ keys:
+ - v1-dependencies-{{ checksum "pom.xml" }}
+ # fallback to using the latest cache if no exact match is found
+ - v1-dependencies-
+
+ - run: mvn package -P build-jar dependency:go-offline
+
+ - save_cache:
+ paths:
+ - ~/.m2
+ key: v1-dependencies-{{ checksum "pom.xml" }}
+
+ # run tests!
+ #- run: mvn integration-test
+
+ - run: |
+ set -xu
+ mkdir -p /tmp/artifacts
+ cp target/smartgrid*.jar /tmp/artifacts
+
+ # Save artifacts
+ - store_artifacts:
+ path: /tmp/artifacts
+ destination: build
+
+
+
diff --git a/.idea/hydra.xml b/.idea/hydra.xml
new file mode 100644
index 0000000..123e89c
--- /dev/null
+++ b/.idea/hydra.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docker/Docker-compose.yml b/docker/Docker-compose.yml
deleted file mode 100755
index e07fbd7..0000000
--- a/docker/Docker-compose.yml
+++ /dev/null
@@ -1,16 +0,0 @@
-
-version: '2.1'
-services:
-#Flink + Kafka + Hbase + Zookeeper
- flink-kafka-hbase:
- image:
- container
-# Elasticsearch
- elasticsearch:
-
-# Kibana
- kibana:
-
-# Kafka-manager
- kafka-manager:
-
\ No newline at end of file
diff --git a/docker/Flink-Hbase-Kafka/docker-compose.yml b/docker/Flink-Hbase-Kafka/docker-compose.yml
new file mode 100755
index 0000000..e97db32
--- /dev/null
+++ b/docker/Flink-Hbase-Kafka/docker-compose.yml
@@ -0,0 +1,29 @@
+version: '2.1'
+services:
+ flink-kafka-hbase:
+ image: quay.io/koldbyte/flink-cluster
+ container_name: flink_to_hbase
+ hostname: hbase-docker
+ ports:
+ - 8080:8080
+ - 8085:8085
+ - 9090:9090
+ - 9092:9092
+ - 9095:9095
+ - 2181:2181
+ - 16000:16000
+ - 16010:16010
+ - 16020:16020
+ - 16030:16030
+ - 6123:6123
+ - 7203:7203
+ - 8081:8081
+ - 8090:8090
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: hbase-docker
+ volumes:
+ - /data
+ external_links:
+ - elasticsearch
+ - kibana
+ - kafka-manager
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
new file mode 100755
index 0000000..6b7b24a
--- /dev/null
+++ b/docker/docker-compose.yml
@@ -0,0 +1,67 @@
+version: '2.1'
+services:
+ flink-kafka-hbase:
+ domainname: smartgrid.com
+ extends:
+ service: flink-kafka-hbase
+ file: ./Flink-Hbase-Kafka/docker-compose.yml
+ networks:
+ - smartgrid.com
+ volumes:
+ - /app/smartgrid/data:/data
+ links:
+ - elasticsearch
+ - elasticsearch2
+
+ elasticsearch:
+ domainname: smartgrid.com
+ extends:
+ service: elasticsearch
+ file: ./elasticsearch/docker-compose.yml
+ networks:
+ - smartgrid.com
+ volumes:
+ - /app/smartgrid/es-data:/usr/share/elasticsearch/data
+
+ elasticsearch2:
+ domainname: smartgrid.com
+ extends:
+ service: elasticsearch2
+ file: ./elasticsearch/docker-compose.yml
+ networks:
+ - smartgrid.com
+ volumes:
+ - /app/smartgrid/es-data-2:/usr/share/elasticsearch/data
+
+ kibana:
+ domainname: smartgrid.com
+ extends:
+ service: kibana
+ file: ./kibana/docker-compose.yml
+ networks:
+ - smartgrid.com
+ depends_on:
+ - elasticsearch
+ links:
+ - elasticsearch
+ - elasticsearch2
+
+ kafka_manager:
+ domainname: smartgrid.com
+ extends:
+ service: kafka_manager
+ file: ./kafka-manager/docker-compose.yml
+ networks:
+ - smartgrid.com
+ volumes:
+ - /app/smartgrid/kafka-manager:/kafka-manager/configuration
+ depends_on:
+ - flink-kafka-hbase
+ links:
+ - flink-kafka-hbase
+ - "flink-kafka-hbase:hbase-docker"
+
+networks:
+ smartgrid.com:
+ driver: bridge
+ name: smartgrid.com
\ No newline at end of file
diff --git a/docker/elasticsearch/docker-compose.yml b/docker/elasticsearch/docker-compose.yml
index 97f4364..0b6dbd7 100755
--- a/docker/elasticsearch/docker-compose.yml
+++ b/docker/elasticsearch/docker-compose.yml
@@ -1,54 +1,59 @@
-elasticsearch:
- image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2
- ports:
- - 9200:9200
- - 9300:9300
- container_name: elasticsearch
- ulimits:
- memlock:
- soft: -1
- hard: -1
- mem_limit: 1g
- environment:
- - cluster.name=docker-cluster
- - node.name=one
- - bootstrap.memory_lock=false
- - xpack.security.enabled=false
- - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- - network.publish_host=192.168.99.100
- - transport.publish_port=9300
- - transport.host=0.0.0.0
- - transport.tcp.port=9300
- - network.host=0.0.0.0
- - http.host=0.0.0.0
- - http.port=9200
- volumes:
- - /usr/share/elasticsearch/data
+version: '2.1'
+services:
+ elasticsearch:
+ image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2
+ hostname: elasticsearch
+ ports:
+ - 9200:9200
+ - 9300:9300
+ container_name: elasticsearch
+ ulimits:
+ memlock:
+ soft: -1
+ hard: -1
+ mem_limit: 1g
+ environment:
+ - cluster.name=docker-cluster
+ - node.name=one
+ - bootstrap.memory_lock=false
+ - xpack.security.enabled=false
+ - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+ - network.publish_host=elasticsearch
+ - transport.publish_port=9300
+ - transport.host=0.0.0.0
+ - transport.tcp.port=9300
+ - network.host=0.0.0.0
+ - http.host=0.0.0.0
+ - http.port=9200
+ volumes:
+ - /usr/share/elasticsearch/data
-elasticsearch2:
- image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2
- ports:
- - 9201:9200
- - 9301:9300
- container_name: elasticsearch2
- ulimits:
- memlock:
- soft: -1
- hard: -1
- mem_limit: 1g
- environment:
- - cluster.name=docker-cluster
- - node.name=two
- - bootstrap.memory_lock=false
- - xpack.security.enabled=false
- - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- - network.publish_host=192.168.99.100
- - transport.publish_port=9301
- - "discovery.zen.ping.unicast.hosts=192.168.99.100"
- - "discovery.zen.minimum_master_nodes=2"
- - network.host=0.0.0.0
- - transport.host=0.0.0.0
- - transport.tcp.port=9300
- - http.host=0.0.0.0
- volumes:
- - /usr/share/elasticsearch/data
+ elasticsearch2:
+ image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2
+ hostname: elasticsearch2
+ ports:
+ - 9201:9200
+ - 9301:9300
+ container_name: elasticsearch2
+ ulimits:
+ memlock:
+ soft: -1
+ hard: -1
+ mem_limit: 1g
+ environment:
+ - cluster.name=docker-cluster
+ - node.name=two
+ - bootstrap.memory_lock=false
+ - xpack.security.enabled=false
+ - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+ - network.publish_host=elasticsearch2
+ - transport.publish_port=9300
+ - "discovery.zen.ping.unicast.hosts=elasticsearch"
+ - "discovery.zen.minimum_master_nodes=2"
+ - network.host=0.0.0.0
+ - transport.host=0.0.0.0
+ - transport.tcp.port=9300
+ - http.host=0.0.0.0
+ - http.port=9200
+ volumes:
+ - /usr/share/elasticsearch/data
diff --git a/docker/kafka-manager/Docker-compose.yml b/docker/kafka-manager/docker-compose.yml
similarity index 61%
rename from docker/kafka-manager/Docker-compose.yml
rename to docker/kafka-manager/docker-compose.yml
index f6a0b69..982f33a 100755
--- a/docker/kafka-manager/Docker-compose.yml
+++ b/docker/kafka-manager/docker-compose.yml
@@ -1,9 +1,12 @@
-services:
- kafka_manager:
- image: hlebalbau/kafka-manager
- ports:
- - "9000:9000"
- environment:
- ZK_HOSTS: "zoo:2181"
- APPLICATION_SECRET: "random-secret"
+version: '2.1'
+services:
+ kafka_manager:
+ image: hlebalbau/kafka-manager
+ hostname: kafka-manager
+ container_name: kafka-manager
+ ports:
+ - "9000:9000"
+ environment:
+ ZK_HOSTS: "hbase-docker:2181/kafka"
+ APPLICATION_SECRET: "random-secret"
command: -Dpidfile.path=/dev/null
\ No newline at end of file
diff --git a/docker/kibana/Docker-compose.yml b/docker/kibana/docker-compose.yml
similarity index 56%
rename from docker/kibana/Docker-compose.yml
rename to docker/kibana/docker-compose.yml
index 7214ea9..5617be5 100755
--- a/docker/kibana/Docker-compose.yml
+++ b/docker/kibana/docker-compose.yml
@@ -1,11 +1,14 @@
-version: '2'
-services:
- kibana:
- image: kibana:5.1.2
- container_name: kibana
- restart: always
- network_mode: "bridge"
- ports:
- - "5601:5601"
- external_links:
- - elasticsearch:elasticsearch
+version: '2.1'
+services:
+ kibana:
+ image: kibana:5.1.2
+ hostname: kibana
+ container_name: kibana
+ restart: always
+ ports:
+ - "5601:5601"
+ environment:
+ - ELASTICSEARCH_URL=http://elasticsearch:9200
+ - SERVER_NAME=elasticsearch
+ external_links:
+ - elasticsearch:elasticsearch
diff --git a/pom.xml b/pom.xml
index 2eaa215..9c5fe3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@ under the License.
5.1.2
0.11.0.2
+ 3.2
@@ -159,6 +160,11 @@ under the License.
flink-connector-elasticsearch5_${scala.binary.version}
${flink.version}
+
+ com.tdunning
+ t-digest
+ ${tdigest.version}
+
diff --git a/smartgrid.iml b/smartgrid.iml
index e8f7a49..e75d68b 100644
--- a/smartgrid.iml
+++ b/smartgrid.iml
@@ -10,6 +10,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -126,7 +146,6 @@
-
@@ -150,6 +169,7 @@
+
diff --git a/src/main/resources/META-INF/MANIFEST.MF b/src/main/resources/META-INF/MANIFEST.MF
index f7055b0..c89a621 100644
--- a/src/main/resources/META-INF/MANIFEST.MF
+++ b/src/main/resources/META-INF/MANIFEST.MF
@@ -1,4 +1,3 @@
Manifest-Version: 1.0
-Main-Class: com.bhaskardivya.projects.smartgrid.SmartGridProcessorFrom
- File
+Main-Class: com.bhaskardivya.projects.smartgrid.job.PlugAveragingJob
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob.scala
deleted file mode 100644
index 7d6a64f..0000000
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-package com.bhaskardivya.projects.smartgrid.archived
-
-import com.bhaskardivya.projects.smartgrid.base.AbstractKeyGetter
-import com.bhaskardivya.projects.smartgrid.model._
-import com.bhaskardivya.projects.smartgrid.operators.AverageAggregateWithKey
-import com.bhaskardivya.projects.smartgrid.pipeline._
-import com.bhaskardivya.projects.smartgrid.sinks.HBaseOutputFormatAverageWithKey
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-
-object SensorEventHouseAveragingJob {
- def main(args: Array[String]): Unit = {
- // parse parameters
- val params = ParameterTool.fromArgs(args)
-
- // Initialise the environment for flink
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- // will be using the timestamp from the records
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- // Get the stream according to params
- val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Parsed Data")
-
- val withTimestamps = stream
- .assignTimestampsAndWatermarks(SensorEvent.tsAssigner())
- .name("Kafka Source with TS")
-
- val windowed1min = withTimestamps
- .keyBy(keyGetter(_))
- .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregateWithKey(keyGetter))
- .name("Average for 1 min Window")
-
- val windowed5min = windowed1min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name("Average for 5 min Window")
-
- val windowed15min = windowed5min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name("Average for 15 min Window")
-
- val windowed60min = windowed15min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name("Average for 60 min Window")
-
- val windowed120min = windowed60min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name("Average for 120 min Window")
-
- val debug = params.has("debug")
- if(debug){
- windowed1min.writeAsCsv("/data/output.windowed1min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed5min.writeAsCsv("/data/output.windowed5min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed15min.writeAsCsv("/data/output.windowed15min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed60min.writeAsCsv("/data/output.windowed60min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed120min.writeAsCsv("/data/output.windowed120min.csv", FileSystem.WriteMode.OVERWRITE)
- }
-
- windowed1min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_1MIN, Constants.HOUSE_CF)))
- .name("House - 1 Min Window")
-
- windowed5min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_5MIN, Constants.HOUSE_CF)))
- .name("House - 5 Min Window")
-
- windowed15min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_15MIN, Constants.HOUSE_CF)))
- .name("House - 15 Min Window")
-
- windowed60min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_60MIN, Constants.HOUSE_CF)))
- .name("House - 60 Min Window")
-
- windowed120min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_120MIN, Constants.HOUSE_CF)))
- .name("House - 120 Min Window")
-
- env.execute("Sensor Event House Averaging Job (Kafka to HBase Averages) ")
-
- }
-
- object keyGetter extends AbstractKeyGetter{
- def apply(element: SensorEvent): SensorKeyObject = {
- SensorKeyObject(element.house_id)
- }
- }
-
-}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob2.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob2.scala
deleted file mode 100644
index 506667d..0000000
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob2.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.bhaskardivya.projects.smartgrid.archived
-
-import com.bhaskardivya.projects.smartgrid.model.{Constants, SensorEvent}
-import com.bhaskardivya.projects.smartgrid.operators.AverageAggregate
-import com.bhaskardivya.projects.smartgrid.pipeline._
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-
-object SensorEventHouseAveragingJob2 {
- def main(args: Array[String]): Unit = {
- // parse parameters
- val params = ParameterTool.fromArgs(args)
-
- // Initialise the environment for flink
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- // will be using the timestamp from the records
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- // Get the stream according to params
- val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Raw")
-
- val withTimestampsKeyed = stream
- //id, timestamp, value, property, plug_id, household_id, house_id
- //.map(x => SensorEvent(x(0).toLong, x(1).toLong, x(2).toDouble, x(3).toInt, x(4).toLong, x(5).toLong, x(6).toLong))
- .assignTimestampsAndWatermarks(SensorEvent.tsAssigner())
- .name("Kafka Source with TS")
- .keyBy(_.house_id)
-
- val windowed1min = withTimestampsKeyed
- .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregate())//, new AverageWindowFunction())
- .name("Average for 1 min Window")
-
- windowed1min.writeAsCsv("/data/window_1min_out.csv", FileSystem.WriteMode.OVERWRITE)
-
- val windowed5min = withTimestampsKeyed
- .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregate())//, new AverageWindowFunction())
- .name("Average for 5 min Window")
-
- val windowed15min = withTimestampsKeyed
- .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregate())//, new AverageWindowFunction())
- .name("Average for 15 min Window")
-
- val windowed60min = withTimestampsKeyed
- .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregate())//, new AverageWindowFunction())
- .name("Average for 60 min Window")
-
- val windowed120min = withTimestampsKeyed
- .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregate())//, new AverageWindowFunction())
- .name("Average for 120 min Window")
-
- /* windowed1min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_1MIN, Constants.HOUSE_CF)))
- .name("House - 1 Min Window")
-
- windowed5min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_5MIN, Constants.HOUSE_CF)))
- .name("House - 5 Min Window")
-
- windowed15min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_15MIN, Constants.HOUSE_CF)))
- .name("House - 15 Min Window")
-
- windowed60min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_60MIN, Constants.HOUSE_CF)))
- .name("House - 60 Min Window")
-
- windowed120min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_120MIN, Constants.HOUSE_CF)))
- .name("House - 120 Min Window")*/
-
- env.execute("Sensor Event House Averaging Job (Kafka to HBase Averages) ")
-
- }
-}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventPlugAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventPlugAveragingJob.scala
deleted file mode 100644
index 74caacb..0000000
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventPlugAveragingJob.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.bhaskardivya.projects.smartgrid.archived
-
-import com.bhaskardivya.projects.smartgrid.model.{Average, Constants, SensorEvent}
-import com.bhaskardivya.projects.smartgrid.operators.{AverageAggregate, AverageWindowFunction}
-import com.bhaskardivya.projects.smartgrid.pipeline._
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-
-object SensorEventPlugAveragingJob {
- def main(args: Array[String]): Unit = {
- // parse parameters
- val params = ParameterTool.fromArgs(args)
-
- // Initialise the environment for flink
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- // will be using the timestamp from the records
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- // Get the stream according to params
- val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Raw")
-
- //no need to add timestamps as kafka source already has it
- //val withTimestamps = stream
- //id, timestamp, value, property, plug_id, household_id, house_id
- //.map(x => SensorEvent(x(0).toLong, x(1).toLong, x(2).toDouble, x(3).toInt, x(4).toLong, x(5).toLong, x(6).toLong))
- //.assignTimestampsAndWatermarks(new PunctuatedAssigner())
-
- val windowed1min = stream
- .keyBy(_.plug_id)
- .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregate(), new AverageWindowFunction())
- .name("Average for 1 min Window")
-
- val windowed5min = windowed1min
- .keyBy(_._1)
- .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(
- (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2)
- )
- .name("Average for 5 min Window")
-
- val windowed15min = windowed5min
- .keyBy(_._1)
- .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(
- (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2)
- )
- .name("Average for 15 min Window")
-
- val windowed60min = windowed15min
- .keyBy(_._1)
- .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(
- (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2)
- )
- .name("Average for 60 min Window")
-
- val windowed120min = windowed60min
- .keyBy(_._1)
- .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(
- (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2)
- )
- .name("Average for 120 min Window")
-/*
-
- windowed1min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_1MIN, Constants.HOUSE_CF))
- .name("Plug - 1 Min Window")
-
- windowed5min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_5MIN, Constants.HOUSE_CF))
- .name("Plug - 5 Min Window")
-
- windowed15min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_15MIN, Constants.HOUSE_CF))
- .name("Plug - 15 Min Window")
-
- windowed60min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_60MIN, Constants.HOUSE_CF))
- .name("Plug - 60 Min Window")
-
- windowed120min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_120MIN, Constants.HOUSE_CF))
- .name("Plug - 120 Min Window")
-*/
-
- env.execute("Sensor Event Plug Averaging Job (Kafka to HBase Averages) ")
-
- }
-}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/base/PredictionJobSingleWindowBase.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/PredictionJobSingleWindowBase.scala
new file mode 100644
index 0000000..f41399c
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/PredictionJobSingleWindowBase.scala
@@ -0,0 +1,188 @@
+package com.bhaskardivya.projects.smartgrid.base
+
+import java.io.File
+
+import com.bhaskardivya.projects.smartgrid.model._
+import com.bhaskardivya.projects.smartgrid.operators._
+import com.bhaskardivya.projects.smartgrid.pipeline._
+import com.bhaskardivya.projects.smartgrid.sinks.ElasticSearchSink
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.{DataStream, _}
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+ * Abstract class that represents the Main Job that calculates the
+ * averages and medians which are being used here itself to predict
+ * the load forecast
+ */
+abstract class PredictionJobSingleWindowBase extends Serializable {
+
+ //register implicits for types
+ implicit val typeInfoAverageWithKey: TypeInformation[AverageWithKey] = TypeInformation.of(classOf[AverageWithKey])
+ implicit val typeInfoPrediction2: TypeInformation[Prediction] = TypeInformation.of(classOf[Prediction])
+ implicit val typeInfoTime: TypeInformation[Time] = TypeInformation.of(classOf[Time])
+
+ private val LOG_DIR = "/data/" + getKeyName()
+
+ /**
+ * Method that returns the name of the key in the source datum
+ *
+ * @return String key name
+ */
+ def getKeyName(): String
+
+ /**
+ * Method that returns the value of the key in the source datum
+ *
+ * @param element SensorEvent record
+ * @return Long Key value
+ */
+ def getKey(element: SensorEvent): SensorKeyObject
+
+ object keyGetter extends AbstractKeyGetter {
+ def apply(element: SensorEvent): SensorKeyObject = {
+ getKey(element)
+ }
+ }
+
+ /**
+ * Method to prepare the raw events properly aggregated(sum) based on the key
+ * @param dataStream source raw data stream
+ * @return
+ */
+ def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent]
+
+ // Sliding window durations in minutes
+ val windowDurations = List(5)
+
+ def main(args: Array[String]): Unit = {
+
+ //Create the log dirs
+ try {
+ new File(LOG_DIR).mkdir()
+ new File(LOG_DIR + "/state/").mkdir()
+ new File(LOG_DIR + "/input/").mkdir()
+ new File(LOG_DIR + "/output_avg/").mkdir()
+ new File(LOG_DIR + "/output_prediction/").mkdir()
+ } catch {
+ case e: Exception => println("Directories already created" + e.getMessage)
+ }
+
+ // parse parameters
+ val params = ParameterTool.fromArgs(args)
+
+ // Initialise the environment for flink
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // will be using the timestamp from the records
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ //env.enableCheckpointing(10000) // checkpoint every 10000 msecs
+ //env.setStateBackend(new FsStateBackend(LOG_DIR +"/state/"))
+
+ // Get the stream according to params
+ val rawStream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Source with Timestamp")
+
+ // Create a stream with sum according to the key specified
+ val initializedFlow = if (params.has("deduplicate")) initializeFlow(rawStream) else rawStream
+
+ if(params.getBoolean("sink.raw", false))
+ initializedFlow
+ .addSink(ElasticSearchSink[SensorEvent](params, Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_RAW))
+ .name("Sensor Raw to ES")
+
+ // Create a Global Window for work values which will output the missing load values
+ val averageUsingWorkValues: DataStream[AverageWithKey] = rawStream
+ .filter(_.property == Constants.PROPERTY_WORK)
+ .keyBy(e => getKey(e))
+ .countWindow(2, 1)
+ .process(new WorkValueProcessWindow)
+ .flatMap(new WorkValueFlatMap(60)(keyGetter)) //60 seconds
+
+ val averageWithKeys = initializedFlow
+ .map(e => AverageWithKey(keyGetter(e), Slice(Time.minutes(1))(e.timestamp), Average(e.value, 1)))
+ // Assumption: If both the Load values and work Values are available in a slice,
+ // Average of them will still be closer to the real measurement
+ .union(averageUsingWorkValues)
+
+ //create median states for various window duration
+ createMedianState(averageWithKeys)
+
+ // Create the average sliding window stream and corresponding Prediction Stream
+ val windowed_average_5min = createAverageStream(params, 5, averageWithKeys)
+ if(params.getBoolean("sink.5min", false)) {
+ val prediction_5min = createPredictionStream(params, 5, windowed_average_5min)
+ createPredictionSink(params, prediction_5min, Constants.ES_INDEX_TYPE_5MIN)
+ }
+
+ env.execute("Sensor Event" + getKeyName() + " Prediction Job")
+ }
+
+ /**
+ * Helper function to create median MapState for each window duration
+ * @param averageWithKeyStream DataStream of AverageWithKey
+ */
+ def createMedianState(averageWithKeyStream: DataStream[AverageWithKey]): Unit = {
+
+ // Streams for each window duration for the average
+ windowDurations.foreach(duration => {
+
+ val stateName = getStateName(duration)
+
+ val avg_windowed = averageWithKeyStream
+ .keyBy(_.key)
+ .window(TumblingEventTimeWindows.of(Time.minutes(duration)))
+ .reduce(new AverageWithKeyReducer)
+ .name(getKeyName() + " Average for " + duration + " min Tumbling Window")
+
+ // Store median as operator state
+ avg_windowed
+ .keyBy(_.key)
+ .flatMap(new MedianWithKeyMapper(stateName))
+ .name(getKeyName() + " Median state for " + duration + " min Tumbling Window")
+
+ })
+ }
+
+ def createAverageStream(params: ParameterTool, duration: Int, sourceStream: DataStream[AverageWithKey]): DataStream[AverageWithKey] = {
+
+ val slidingInterval = params.getInt("sliding.interval", Constants.SLIDING_INTERVAL)
+
+ val windowed_average = sourceStream
+ .keyBy(_.key)
+ // Map the Correct Slice duration
+ .map(e => AverageWithKey(e.key, Slice(Time.minutes(duration))(e.slice.timestamp), e.average))
+ .keyBy(_.key)
+ .window(SlidingEventTimeWindows.of(Time.minutes(duration), Time.seconds(slidingInterval)))
+ .reduce(new AverageWithKeyReducer)
+
+ windowed_average
+ }
+
+ def createPredictionStream(params: ParameterTool, duration: Int, averageStream: DataStream[AverageWithKey]): DataStream[Prediction] = {
+ val windowed_prediction: DataStream[Prediction] = averageStream
+ .keyBy(_.key)
+ .flatMap(new EnrichMapper(getStateName(duration)))
+ .name(getKeyName() + " Prediction values for " + duration + " min")
+
+ windowed_prediction
+ }
+
+ def createPredictionSink(params: ParameterTool, windowed_prediction: DataStream[Prediction], indexType: String) = {
+ // Sink the Predicted value streams to Elasticsearch
+ windowed_prediction
+ .addSink(ElasticSearchSink[Prediction](params, Constants.ES_INDEX_NAME, indexType))
+ .name(getKeyName() + " Prediction Sink - ES - " + indexType + " min Window")
+
+ // Write to file for debug
+ if(params.has("debug")){
+ windowed_prediction.writeAsText(LOG_DIR + "/output_prediction/windowed"+ indexType +".csv", FileSystem.WriteMode.OVERWRITE)
+ }
+ }
+
+ def getStateName(duration: Int): String = "median-" + duration + "min"
+}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala
index a45104b..1277850 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala
@@ -1,18 +1,18 @@
package com.bhaskardivya.projects.smartgrid.base
import java.io.File
-import java.util.concurrent.TimeUnit
import com.bhaskardivya.projects.smartgrid.model._
-import com.bhaskardivya.projects.smartgrid.operators.{AverageAggregateWithKey, HBaseAsyncFunction, PredictionFunction}
+import com.bhaskardivya.projects.smartgrid.operators._
import com.bhaskardivya.projects.smartgrid.pipeline._
-import com.bhaskardivya.projects.smartgrid.sinks.{HBaseOutputFormatAverageWithKey, PredictionElasticSearchSink, SensorEventElasticSearchSink}
+import com.bhaskardivya.projects.smartgrid.sinks.ElasticSearchSink
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
+import org.apache.flink.streaming.api.scala.{DataStream, _}
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
/**
@@ -22,6 +22,11 @@ import org.apache.flink.streaming.api.windowing.time.Time
*/
abstract class SensorEventAveragingJobBase extends Serializable {
+ //register implicits for types
+ implicit val typeInfoAverageWithKey: TypeInformation[AverageWithKey] = TypeInformation.of(classOf[AverageWithKey])
+ implicit val typeInfoPrediction2: TypeInformation[Prediction] = TypeInformation.of(classOf[Prediction])
+ implicit val typeInfoTime: TypeInformation[Time] = TypeInformation.of(classOf[Time])
+
private val LOG_DIR = "/data/" + getKeyName()
/**
@@ -33,7 +38,7 @@ abstract class SensorEventAveragingJobBase extends Serializable {
/**
* Method to prepare the raw events properly aggregated(sum) based on the key
- * @param dataStream
+ * @param dataStream source raw data stream
* @return
*/
def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent]
@@ -41,8 +46,8 @@ abstract class SensorEventAveragingJobBase extends Serializable {
/**
* Method that returns the value of the key in the source datum
*
- * @param element
- * @return Long Key value
+ * @param element SensorEvent record
+ * @return Long Key value
*/
def getKey(element: SensorEvent): SensorKeyObject
@@ -58,16 +63,20 @@ abstract class SensorEventAveragingJobBase extends Serializable {
}
}
+ // Sliding window durations in minutes
+ val windowDurations = List(1, 5, 15, 60, 120)
+
def main(args: Array[String]): Unit = {
+
//Create the log dirs
try {
new File(LOG_DIR).mkdir()
+ new File(LOG_DIR + "/state/").mkdir()
new File(LOG_DIR + "/input/").mkdir()
new File(LOG_DIR + "/output_avg/").mkdir()
- new File(LOG_DIR + "/output_enriched/").mkdir()
new File(LOG_DIR + "/output_prediction/").mkdir()
} catch {
- case e: Exception => println("Directories already created")
+ case e: Exception => println("Directories already created" + e.getMessage)
}
// parse parameters
@@ -79,187 +88,132 @@ abstract class SensorEventAveragingJobBase extends Serializable {
// will be using the timestamp from the records
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ //env.enableCheckpointing(10000) // checkpoint every 10000 msecs
+ //env.setStateBackend(new FsStateBackend(LOG_DIR +"/state/"))
+
// Get the stream according to params
- val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Source")
+ val rawStream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Source with Timestamp")
- val withTimestamps = stream
- .assignTimestampsAndWatermarks(SensorEvent.tsAssigner())
- .name("Source with Timestamp")
+ // Create a Global Window for work values which will output the missing load values
+ val averageUsingWorkValues: DataStream[AverageWithKey] = rawStream
+ .filter(_.property == Constants.PROPERTY_WORK)
+ .keyBy(e => getKey(e))
+ .countWindow(2, 1)
+ .process(new WorkValueProcessWindow)
+ .flatMap(new WorkValueFlatMap(60)(keyGetter)) //60 seconds
// Create a stream with sum according to the key specified
- val initializedFlow = initializeFlow(withTimestamps)
+ val initializedFlow = if (params.has("deduplicate")) initializeFlow(rawStream) else rawStream
+
+ val averageWithKeys = initializedFlow
+ .map(e => AverageWithKey(keyGetter(e), Slice(Time.minutes(1))(e.timestamp), Average(e.value, 1)))
+ // Assumption: If both the Load values and work Values are available in a slice,
+ // Average of them will still be closer to the real measurement
+ .union(averageUsingWorkValues)
+
+ //create median states for various window duration
+ createMedianState(averageWithKeys)
+
+ // Create the average sliding window stream and corresponding Prediction Stream
+ val windowed_average_1min = createAverageStream(params, 1, averageWithKeys)
+ if (params.getBoolean("sink.1min", false)) {
+ val prediction_1min = createPredictionStream(params, 1, windowed_average_1min)
+ createPredictionSink(params, prediction_1min, Constants.ES_INDEX_TYPE_1MIN)
+ }
- // Streams for each window duration for the average
- val avg_windowed1min = initializedFlow
- .keyBy(keyGetter(_))
- .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL)))
- .aggregate(new AverageAggregateWithKey(keyGetter))
- .name(getKeyName() + "Average for 1 min Window")
+ val windowed_average_5min = createAverageStream(params, 5, windowed_average_1min)
+ if(params.getBoolean("sink.5min", false)) {
+ val prediction_5min = createPredictionStream(params, 5, windowed_average_5min)
+ createPredictionSink(params, prediction_5min, Constants.ES_INDEX_TYPE_5MIN)
+ }
- val avg_windowed5min = avg_windowed1min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name(getKeyName() + "Average for 5 min Window")
+ val windowed_average_15min = createAverageStream(params, 15, windowed_average_5min)
+ if(params.getBoolean("sink.15min", false)) {
+ val prediction_15min = createPredictionStream(params, 15, windowed_average_15min)
+ createPredictionSink(params, prediction_15min, Constants.ES_INDEX_TYPE_15MIN)
+ }
- val avg_windowed15min = avg_windowed5min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name(getKeyName() + "Average for 15 min Window")
+ val windowed_average_60min = createAverageStream(params, 60, windowed_average_15min)
+ if(params.getBoolean("sink.60min", false)) {
+ val prediction_60min = createPredictionStream(params, 60, windowed_average_60min)
+ createPredictionSink(params, prediction_60min, Constants.ES_INDEX_TYPE_60MIN)
+ }
- val avg_windowed60min = avg_windowed15min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name(getKeyName() + "Average for 60 min Window")
+ val windowed_average_120min = createAverageStream(params, 120, windowed_average_60min)
+ if(params.getBoolean("sink.120min", false)) {
+ val prediction_120min = createPredictionStream(params, 120, windowed_average_120min)
+ createPredictionSink(params, prediction_120min, Constants.ES_INDEX_TYPE_120MIN)
+ }
- val avg_windowed120min = avg_windowed60min
- .keyBy(_.key)
- .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL)))
- .reduce(AverageWithKey.reducer)
- .name(getKeyName() + "Average for 120 min Window")
+ if(params.getBoolean("sink.raw", false))
+ initializedFlow
+ .addSink(ElasticSearchSink[SensorEvent](params, Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_RAW))
+ .name("Sensor Raw to ES")
- // Write to file for debug
- val debug = params.has("debug")
- if (debug) {
- initializedFlow.writeAsCsv(LOG_DIR + "/input/initializedFlow.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Initialized Flow")
- avg_windowed1min.writeAsCsv(LOG_DIR + "/output_avg/windowed1min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 1 Min")
- avg_windowed5min.writeAsCsv(LOG_DIR + "/output_avg/windowed5min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 5 Min")
- avg_windowed15min.writeAsCsv(LOG_DIR + "/output_avg/windowed15min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 15 Min")
- avg_windowed60min.writeAsCsv(LOG_DIR + "/output_avg/windowed60min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 60 Min")
- avg_windowed120min.writeAsCsv(LOG_DIR + "/output_avg/windowed120min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 120 Min")
- }
+ env.execute("Sensor Event" + getKeyName() + " Prediction Job")
+ }
- // Write to HBase for each window duration
- avg_windowed1min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_1MIN, getTargetColumnFamily())))
- .name(getKeyName() + " Average - HBase - 1 Min Window")
-
- avg_windowed5min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_5MIN, getTargetColumnFamily())))
- .name(getKeyName() + " Average - HBase - 5 Min Window")
-
- avg_windowed15min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_15MIN, getTargetColumnFamily())))
- .name(getKeyName() + " Average - HBase - 15 Min Window")
-
- avg_windowed60min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_60MIN, getTargetColumnFamily())))
- .name(getKeyName() + " Average - HBase - 60 Min Window")
-
- avg_windowed120min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_120MIN, getTargetColumnFamily())))
- .name(getKeyName() + " Average - HBase - 120 Min Window")
-
- val timeout = params.getLong("timeout", 3000000L) // 300 seconds timeout
-
- // Enrich the average calculation with the median value for each stream of different window duration
- val windowed1min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait(
- avg_windowed1min,
- new HBaseAsyncFunction(Constants.TABLE_1MIN, getTargetColumnFamily()),
- timeout,
- TimeUnit.MILLISECONDS,
- 1
- )
- .startNewChain()
- .name(getKeyName() + " Enriched - 1 Min Window")
-
- val windowed5min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait(
- avg_windowed5min,
- new HBaseAsyncFunction(Constants.TABLE_5MIN, getTargetColumnFamily()),
- timeout,
- TimeUnit.MILLISECONDS,
- 1
- )
- .startNewChain()
- .name(getKeyName() + " Enriched - 5 Min Window")
-
- val windowed15min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait(
- avg_windowed15min,
- new HBaseAsyncFunction(Constants.TABLE_15MIN, getTargetColumnFamily()),
- timeout,
- TimeUnit.MILLISECONDS,
- 1
- )
- .startNewChain()
- .name(getKeyName() + " Enriched - 15 Min Window")
-
- val windowed60min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait(
- avg_windowed60min,
- new HBaseAsyncFunction(Constants.TABLE_60MIN, getTargetColumnFamily()),
- timeout,
- TimeUnit.MILLISECONDS,
- 1
- )
- .startNewChain()
- .name(getKeyName() + " Enriched - 60 Min Window")
-
- val windowed120min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait(
- avg_windowed120min,
- new HBaseAsyncFunction(Constants.TABLE_120MIN, getTargetColumnFamily()),
- timeout,
- TimeUnit.MILLISECONDS,
- 1
- )
- .startNewChain()
- .name(getKeyName() + " Enriched - 120 Min Window")
-
- if (debug) {
- windowed1min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed1min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 1 Min Window")
- windowed5min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed5min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 5 Min Window")
- windowed15min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed15min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 15 Min Window")
- windowed60min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed60min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 60 Min Window")
- windowed120min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed120min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 120 Min Window")
- }
+ /**
+ * Helper function to create median MapState for each window duration
+ * @param averageWithKeyStream DataStream of AverageWithKey
+ */
+ def createMedianState(averageWithKeyStream: DataStream[AverageWithKey]): Unit = {
- // Create the predicted value streams
- val windowed1min_prediction = windowed1min_enriched
- .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(1).toMilliseconds))
+ // Streams for each window duration for the average
+ windowDurations.foreach(duration => {
- val windowed5min_prediction = windowed5min_enriched
- .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(5).toMilliseconds))
+ val stateName = getStateName(duration)
- val windowed15min_prediction = windowed15min_enriched
- .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(15).toMilliseconds))
+ val avg_windowed = averageWithKeyStream
+ .keyBy(_.key)
+ .window(TumblingEventTimeWindows.of(Time.minutes(duration)))
+ .reduce(new AverageWithKeyReducer)
+ .name(getKeyName() + " Average for " + duration + " min Tumbling Window")
- val windowed60min_prediction = windowed60min_enriched
- .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(60).toMilliseconds))
+ // Store median as operator state
+ avg_windowed
+ .keyBy(_.key)
+ .flatMap(new MedianWithKeyMapper(stateName))
+ .name(getKeyName() + " Median state for " + duration + " min Tumbling Window")
- val windowed120min_prediction = windowed120min_enriched
- .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(120).toMilliseconds))
+ })
+ }
- // Sink the Predicted value streams to Elasticsearch
- windowed1min_prediction
- .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_1MIN))
- .name(getKeyName() + " Prediction Sink - ES - 1 min Window")
-
- windowed5min_prediction
- .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_5MIN))
- .name(getKeyName() + " Prediction Sink - ES - 5 min Window")
-
- windowed15min_prediction
- .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_15MIN))
- .name(getKeyName() + " Prediction Sink - ES - 15 min Window")
-
- windowed60min_prediction
- .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_60MIN))
- .name(getKeyName() + " Prediction Sink - ES - 60 min Window")
-
- windowed120min_prediction
- .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_120MIN))
- .name(getKeyName() + " Prediction Sink - ES - 120 min Window")
-
- if (debug) {
- windowed1min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed1min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed5min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed5min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed15min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed15min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed60min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed60min.csv", FileSystem.WriteMode.OVERWRITE)
- windowed120min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed120min.csv", FileSystem.WriteMode.OVERWRITE)
- }
+ def createAverageStream(params: ParameterTool, duration: Int, sourceStream: DataStream[AverageWithKey]): DataStream[AverageWithKey] = {
+
+ val slidingInterval = params.getInt("sliding.interval", Constants.SLIDING_INTERVAL)
+
+ val windowed_average = sourceStream
+ .keyBy(_.key)
+ // Map the Correct Slice duration
+ .map(e => AverageWithKey(e.key, Slice(Time.minutes(duration))(e.slice.timestamp), e.average))
+ .keyBy(_.key)
+ .window(SlidingEventTimeWindows.of(Time.minutes(duration), Time.seconds(slidingInterval)))
+ .reduce(new AverageWithKeyReducer)
- //Sink the original feed into ES as well
- initializedFlow
- .addSink(SensorEventElasticSearchSink(params, Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_RAW))
- .name("Sensor Raw to ES")
+ windowed_average
+ }
- env.execute("Sensor Event" + getKeyName() + " Prediction Job (Kafka to HBase Averages + Prediction to ES)")
+ def createPredictionStream(params: ParameterTool, duration: Int, averageStream: DataStream[AverageWithKey]): DataStream[Prediction] = {
+ val windowed_prediction: DataStream[Prediction] = averageStream
+ .keyBy(_.key)
+ .flatMap(new EnrichMapper(getStateName(duration)))
+ .name(getKeyName() + " Prediction values for " + duration + " min")
+
+ windowed_prediction
+ }
+ def createPredictionSink(params: ParameterTool, windowed_prediction: DataStream[Prediction], indexType: String) = {
+ // Sink the Predicted value streams to Elasticsearch
+ windowed_prediction
+ .addSink(ElasticSearchSink[Prediction](params, Constants.ES_INDEX_NAME, indexType))
+ .name(getKeyName() + " Prediction Sink - ES - " + indexType + " min Window")
+
+ // Write to file for debug
+ if(params.has("debug")){
+ windowed_prediction.writeAsText(LOG_DIR + "/output_prediction/windowed"+ indexType +".csv", FileSystem.WriteMode.OVERWRITE)
+ }
}
+ def getStateName(duration: Int): String = "median-" + duration + "min"
}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala
index 9254f60..f2a83cf 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala
@@ -12,7 +12,7 @@ object HouseAveragingJob extends SensorEventAveragingJobBase with Serializable{
override def getTargetColumnFamily(): String = Constants.HOUSE_CF
- override def initializeFlow(dataStream: DataStream[SensorEvent]) = {
+ override def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] = {
// Sum the values of all the plugs in a house with the same time stamp
dataStream
.keyBy("house_id", "timestamp")
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala
index 189ba10..8e7671e 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala
@@ -12,11 +12,12 @@ object PlugAveragingJob extends SensorEventAveragingJobBase with Serializable{
override def getTargetColumnFamily(): String = Constants.PLUG_CF
- override def initializeFlow(dataStream: DataStream[SensorEvent]) = {
- // Sum all the multiple values with the same timestamp for a given plug of a given house
+ override def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] = {
+ // De-duplicate values with the same timestamp for a given plug of a given house
dataStream
- .keyBy("house_id", "plug_id", "timestamp")
- .sum("value")
- .name("Aggregated by Plug Data")
+ .filter(_.property == Constants.PROPERTY_LOAD)
+ .keyBy("house_id", "household_id" , "plug_id", "timestamp")
+ .reduce((_,b) => b)
+ .name("De-duplicated Raw stream")
}
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob5Min.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob5Min.scala
new file mode 100644
index 0000000..378715c
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob5Min.scala
@@ -0,0 +1,21 @@
+package com.bhaskardivya.projects.smartgrid.job
+
+import com.bhaskardivya.projects.smartgrid.base.PredictionJobSingleWindowBase
+import com.bhaskardivya.projects.smartgrid.model.{Constants, SensorEvent, SensorKeyObject}
+import org.apache.flink.streaming.api.scala.DataStream
+
+object PlugAveragingJob5Min extends PredictionJobSingleWindowBase with Serializable{
+
+ override def getKeyName(): String = "Plug"
+
+ override def getKey(element: SensorEvent): SensorKeyObject = SensorKeyObject(element.house_id, element.household_id, element.plug_id)
+
+ override def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] = {
+ // De-duplicate values with the same timestamp for a given plug of a given house
+ dataStream
+ .filter(_.property == Constants.PROPERTY_LOAD)
+ .keyBy("house_id", "household_id" , "plug_id", "timestamp")
+ .reduce((_,b) => b)
+ .name("De-duplicated Raw stream")
+ }
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala
index 43edfa9..4c38f65 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala
@@ -1,5 +1,12 @@
package com.bhaskardivya.projects.smartgrid.model
+import org.apache.flink.streaming.api.scala._
+
+/**
+ * Class to hold the sum and count
+ * @param sum Aggregation of load values
+ * @param count Number of load values
+ */
case class Average(var sum: Double, var count: Long){
def add(that: Average): Average = {
Average(this.sum + that.sum, this.count + that.count)
@@ -8,4 +15,8 @@ case class Average(var sum: Double, var count: Long){
def +(that: Average): Average = {
this.add(that)
}
+
+ def avg: Double = {
+ sum/count
+ }
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala
index 154f69e..d3bcef7 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala
@@ -1,27 +1,28 @@
package com.bhaskardivya.projects.smartgrid.model
import com.gotometrics.orderly.DoubleWritableRowKey
+import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.hadoop.io.DoubleWritable
+import org.apache.flink.streaming.api.scala._
-case class AverageWithKey(var key: SensorKeyObject, var sum: Double, var count: Long, eventTimestamp: Long = Constants.KEY_NO_VALUE){
- val averageValue: Double = sum / count
+case class AverageWithKey(var key: SensorKeyObject, var slice: Slice, average: Average){
+ def averageValue = average.avg
def add(that: AverageWithKey): AverageWithKey = {
- // To ensure that we do no propagate "-1" (Constants.KEY_NO_VALUE) (initial accumulator value)
- if(this.key.house_id > that.key.house_id)
- AverageWithKey(this.key, this.sum + that.sum, this.count + that.count, Math.max(this.eventTimestamp, that.eventTimestamp))
- else
- AverageWithKey(that.key, this.sum + that.sum, this.count + that.count, Math.max(this.eventTimestamp, that.eventTimestamp))
+ AverageWithKey(this.key, this.slice, this.average + that.average)
}
def +(that: AverageWithKey): AverageWithKey = {
this.add(that)
}
+ /* HBase related functions start */
+ @deprecated
def toHBaseColumnName(): String = {
- key.toColumnString()
+ key.toColumnString
}
+ @deprecated
def bytesRowKey(): Array[Byte] = {
// Original Object that will be serialized
val rowkeyVal: DoubleWritable = new DoubleWritable(toHBaseColumnValue().asInstanceOf[java.lang.Double])
@@ -34,17 +35,19 @@ case class AverageWithKey(var key: SensorKeyObject, var sum: Double, var count:
* Otherwise, DoubleColumnInterpreter will not be able to read column value
* @return
*/
+ @deprecated
def toHBaseColumnValue(): Double = {
//this.sum + Constants.DELIMITER + this.count
averageValue
}
+ @deprecated
def toHBaseLongColumnValue(): Long = {
//this.sum + Constants.DELIMITER + this.count
(averageValue * 1000).toLong
}
- def fromHBaseColumnValue(column: String, value: String): AverageWithKey = {
+ /*def fromHBaseColumnValue(column: String, value: String): AverageWithKey = {
//val fields = value.split(Constants.DELIMITER)
//AverageWithKey(column, fields(0).toDouble, fields(2).toLong)
var sumValue = 0.0
@@ -54,11 +57,15 @@ case class AverageWithKey(var key: SensorKeyObject, var sum: Double, var count:
case e: Exception => sumValue = 0.0
}
AverageWithKey(SensorKeyObject.fromColumnString(column), sumValue, 1, Constants.KEY_NO_VALUE)
- }
+ }*/
}
object AverageWithKey {
def reducer = {
(a: AverageWithKey, b: AverageWithKey) => a+b
}
+
+ def getInitialValue(): AverageWithKey = {
+ new AverageWithKey(SensorKeyObject(-1), Slice(Time.milliseconds(-1))(-1), Average(0.0, 0) )
+ }
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala
index 31e3755..4a36a72 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala
@@ -29,4 +29,13 @@ object Constants {
// Sensor Key Object
val KEY_NO_VALUE: Long = -1
+ val PROPERTY_LOAD: Int = 1
+ val PROPERTY_WORK: Int = 0
+
+ // TDigest
+ val TDIGEST_COMPRESSION = 100
+
+ // This is the base timestamp ... the epoch of the world with flink
+ val BASE_TIMESTAMP = 1377900000L // TODO: currently set 0 to align with Unix Epoch
+
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/MedianLoadWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/MedianLoadWithKey.scala
new file mode 100644
index 0000000..953820b
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/MedianLoadWithKey.scala
@@ -0,0 +1,32 @@
+package com.bhaskardivya.projects.smartgrid.model
+
+import com.tdunning.math.stats.TDigest
+
+case class MedianLoadWithKey(var key: SensorKeyObject, slice: Slice, digest: TDigest){
+ def medianLoad : Double = digest.quantile(0.5)
+
+ def add(averageWithKey: AverageWithKey) = {
+ this.digest.add(averageWithKey.averageValue)
+ this
+ }
+
+ def add(other: MedianLoadWithKey) = {
+ this.digest.add(other.digest)
+ this
+ }
+
+ def +(other: MedianLoadWithKey) = add(other)
+}
+
+object MedianLoadWithKey {
+ def reducer = {
+ (a: MedianLoadWithKey, b: MedianLoadWithKey) => a+b
+ }
+
+ def fromAverageWithKey(averageWithKey: AverageWithKey) = {
+ val tDigest: TDigest = TDigest.createDigest(Constants.TDIGEST_COMPRESSION)
+ tDigest.add(averageWithKey.averageValue)
+
+ MedianLoadWithKey(averageWithKey.key, averageWithKey.slice, tDigest)
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala
index cf49348..9cddc00 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala
@@ -1,14 +1,12 @@
package com.bhaskardivya.projects.smartgrid.model
-import java.util.Date
-
+import com.bhaskardivya.projects.smartgrid.util.JSONTrait
import org.apache.sling.commons.json.JSONObject
-
-case class Prediction(averageWithKey: AverageWithKey, medianLoad: MedianLoad, key: String, slidingWindow: Long, predictedValue: Double){
+case class Prediction(var averageWithKey: AverageWithKey, var medianLoad: MedianLoad, var predictedLoad: Double) extends JSONTrait {
def toJSONString(): String = {
- toJSON().toString()
+ toJSON2().toString()
}
def toJSON(): JSONObject = {
@@ -17,29 +15,73 @@ case class Prediction(averageWithKey: AverageWithKey, medianLoad: MedianLoad, ke
//averageWithKey object
val averageWithKeyJSON = averageWithKey.key.toJSON
- averageWithKeyJSON.put("sum", averageWithKey.sum)
- averageWithKeyJSON.put("count", averageWithKey.count)
- averageWithKeyJSON.put("avg", averageWithKey.averageValue)
- averageWithKeyJSON.put("eventTimestamp", averageWithKey.eventTimestamp)
+ averageWithKeyJSON.put("sum", normalise_double(averageWithKey.average.sum))
+ averageWithKeyJSON.put("count", averageWithKey.average.count)
+ averageWithKeyJSON.put("avg", normalise_double(averageWithKey.averageValue))
+ averageWithKeyJSON.put("eventTimestamp", averageWithKey.slice.timestamp)
json.put("averageWithKey", averageWithKeyJSON)
- //mediaLoad
+ //medianLoad
val medianLoadJSON = new JSONObject()
- medianLoadJSON.put("load", medianLoad.load)
+ medianLoadJSON.put("load", normalise_double(medianLoad.load))
json.put("medianLoad", medianLoadJSON)
//key or entity
- json.put("key", key)
+ json.put("house_id", averageWithKey.key.house_id)
+ json.put("household_id", averageWithKey.key.household_id)
+ json.put("plug_id", averageWithKey.key.plug_id)
+
+ //sliding window duration
+ json.put("slidingWindowDuration", averageWithKey.slice.size.toMilliseconds)
+ json.put("slice-start", averageWithKey.slice.ts_start)
+ json.put("slice-stop", averageWithKey.slice.ts_stop)
+
+ //Predicted value
+ json.put("predictedValue", normalise_double(predictedLoad))
+
+ // Current Time
+ json.put("current-timestamp", System.currentTimeMillis)
+
+ json
+ }
+
+ def normalise_double(dbl: Double): Double = {
+ if(dbl < 1e-6) {
+ 0.000001
+ }else{
+ dbl
+ }
+ }
+
+ def toJSON2(): JSONObject = {
+ //main object
+ val json = new JSONObject()
+
+ //averageWithKey object
+ val averageWithKeyJSON = averageWithKey.key.toJSON
+ averageWithKeyJSON.put("sum", normalise_double(averageWithKey.average.sum))
+ averageWithKeyJSON.put("count", averageWithKey.average.count)
+ averageWithKeyJSON.put("avg", normalise_double(averageWithKey.averageValue))
+ averageWithKeyJSON.put("eventTimestamp", averageWithKey.slice.timestamp)
+ json.put("averageWithKey", averageWithKeyJSON)
+
+ //medianLoad
+ val medianLoadJSON = new JSONObject()
+ medianLoadJSON.put("load", normalise_double(medianLoad.load))
+ json.put("medianLoad", medianLoadJSON)
//sliding window duration
- json.put("slidingWindowDuration", slidingWindow)
+ json.put("slidingWindowDuration", averageWithKey.slice.size.toMilliseconds)
+ json.put("slice-start", averageWithKey.slice.ts_start)
+ json.put("slice-stop", averageWithKey.slice.ts_stop)
//Predicted value
- json.put("predictedValue", predictedValue)
+ json.put("predictedValue", normalise_double(predictedLoad))
+ json.put("predicted-slice-start", averageWithKey.slice.predicting_for_slice.ts_start)
// Current Time
- json.put("timestamp", new Date().getTime)
+ json.put("current-timestamp", System.currentTimeMillis)
json
}
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala
index bcc7fab..b839a40 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala
@@ -1,5 +1,6 @@
package com.bhaskardivya.projects.smartgrid.model
+import com.bhaskardivya.projects.smartgrid.util.JSONTrait
import org.apache.commons.lang3.builder.ToStringBuilder
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema
@@ -13,9 +14,9 @@ import org.apache.sling.commons.json.JSONObject
/**
* id, timestamp, value, property, plug_id, household_id, house_id
*/
-case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var property: Int, var plug_id: Long, var household_id: Long, var house_id: Long){
+case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var property: Int, var plug_id: Long, var household_id: Long, var house_id: Long) extends JSONTrait {
- def adjustEventTimestamp(millis: Long) = {
+ def adjustEventTimestamp(millis: Long): Unit = {
this.timestamp = this.timestamp + (millis / 1000)
}
@@ -30,7 +31,7 @@ case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var
val json: JSONObject = new JSONObject()
json.put("id", this.id)
json.put("timestamp", this.timestamp)
- json.put("value", this.value)
+ json.put("value", normalise_double(this.value))
json.put("property", this.property)
json.put("plug_id", this.plug_id)
json.put("household_id", this.household_id)
@@ -38,6 +39,14 @@ case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var
json
}
+
+ def normalise_double(dbl: Double): Double = {
+ if(dbl < 1e-6) {
+ 0.000001
+ }else{
+ dbl
+ }
+ }
}
object SensorEvent {
@@ -56,7 +65,7 @@ object SensorEvent {
new SensorEvent(id, timestamp, value, property, plug_id, household_id, house_id)
} catch {
- case e: Exception => null
+ case _ : Throwable => null
}
}
@@ -69,7 +78,7 @@ object SensorEvent {
def tsAssigner(): BoundedOutOfOrdernessTimestampExtractor[SensorEvent] = {
new BoundedOutOfOrdernessTimestampExtractor[SensorEvent](MAX_DELAY) {
- override def extractTimestamp(element: SensorEvent) = element.getTimeMillis()
+ override def extractTimestamp(element: SensorEvent): Long = element.getTimeMillis()
}
}
}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala
index 1fbce6d..ffed820 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala
@@ -1,12 +1,13 @@
package com.bhaskardivya.projects.smartgrid.model
import org.apache.sling.commons.json.JSONObject
+import org.apache.flink.streaming.api.scala._
class SensorKeyObject(var house_id: Long, var household_id: Long, var plug_id: Long) extends Serializable{
- override def toString: String = this.toColumnString()
+ override def toString: String = this.toColumnString
- def toColumnString(): String = {
+ def toColumnString: String = {
val str: StringBuilder = new StringBuilder
if(this.house_id > Constants.KEY_NO_VALUE)
@@ -23,11 +24,9 @@ class SensorKeyObject(var house_id: Long, var household_id: Long, var plug_id: L
str.toString()
}
- def toJSONString(): String = {
- toJSON().toString
- }
+ def toJSONString: String = toJSON.toString
- def toJSON(): JSONObject = {
+ def toJSON: JSONObject = {
val json: JSONObject = new JSONObject()
if(this.house_id > Constants.KEY_NO_VALUE)
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Slice.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Slice.scala
new file mode 100644
index 0000000..ef991bd
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Slice.scala
@@ -0,0 +1,71 @@
+package com.bhaskardivya.projects.smartgrid.model
+
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.scala._
+
+import scala.collection.immutable
+
+/**
+ * Class to represent a slice index of a size (aka window size)
+ *
+ * @param size Window size
+ * @param timestamp Event timestamp from the record in seconds
+ */
+case class Slice(size: Time)(var timestamp: Long) {
+ private val seconds_in_Day = 24 * 60 * 60
+
+
+ def size_in_seconds: Long = size.toMilliseconds / 1000
+
+ def ts_start: Long = timestamp - (timestamp % size_in_seconds)
+
+ def ts_stop: Long = ts_start + size_in_seconds - 1
+
+ def i: Long = ( ts_start - Constants.BASE_TIMESTAMP ) / size_in_seconds
+
+ def num_slices_in(hr: Long): Long = (hr * 60 * 60) / size_in_seconds
+
+ def num_slices_in_day: Long = num_slices_in(24)
+
+ def j: immutable.IndexedSeq[Long] = {
+ val k = num_slices_in_day
+ (1L to ((i+2)/k)).map(n => (i+2 - n*k))
+ }
+
+ def start_time_of_day: Long = ts_start % seconds_in_Day
+ def stop_time_of_day: Long = ts_stop % seconds_in_Day
+
+ def predicting_for_time_of_day: Long = (start_time_of_day + 2*size_in_seconds) % seconds_in_Day
+ def predicting_previous_slice: Long = (start_time_of_day - 2*size_in_seconds + seconds_in_Day) % seconds_in_Day
+ def predicting_for_slice: Slice = Slice(size)(ts_start + 2*size_in_seconds)
+
+ override def toString : String = {
+ val str: StringBuilder = new StringBuilder
+
+ str.append(size.toMilliseconds.toString)
+ str.append(Constants.DELIMITER)
+ str.append(ts_start.toString)
+ str.append(Constants.DELIMITER)
+ str.append(ts_stop.toString)
+
+ str.toString()
+ }
+}
+
+object Slice {
+ def from(size: Time)(sliceIndex: Long): Slice = {
+ val size_in_seconds = size.toMilliseconds / 1000
+
+ val ts_start = sliceIndex * size_in_seconds + Constants.BASE_TIMESTAMP
+
+ //lets keep the event occurring just right in the middle of the slice index
+ val event_timestamp = ts_start + (size_in_seconds / 2)
+
+ Slice(size)(event_timestamp)
+ }
+
+ def from(size_in_seconds: Long)(sliceIndex: Long) : Slice = {
+ from(Time.seconds(size_in_seconds))(sliceIndex)
+ }
+
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/TwoWorkEvents.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/TwoWorkEvents.scala
new file mode 100644
index 0000000..456cbb2
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/TwoWorkEvents.scala
@@ -0,0 +1,5 @@
+package com.bhaskardivya.projects.smartgrid.model
+
+case class TwoWorkEvents(sensorEvent1: SensorEvent, sensorEvent2: SensorEvent) {
+ def isValid: Boolean = sensorEvent1 != null && sensorEvent2 != null
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/WorkValueFlatMap.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/WorkValueFlatMap.scala
new file mode 100644
index 0000000..bcae404
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/WorkValueFlatMap.scala
@@ -0,0 +1,58 @@
+package com.bhaskardivya.projects.smartgrid.model
+
+import com.bhaskardivya.projects.smartgrid.base.AbstractKeyGetter
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.util.Collector
+
+class WorkValueFlatMap(size_in_seconds: Long)(keyGetter: AbstractKeyGetter) extends FlatMapFunction[TwoWorkEvents, AverageWithKey]{
+ override def flatMap(value: TwoWorkEvents, out: Collector[AverageWithKey]): Unit = {
+
+ if(!value.isValid)
+ return
+
+ // TwoWorkEvents have records with the same key
+ // Find the slices between the two WorkEvents
+ // Generate AverageWithKey records for each of the slices
+
+ val slice_range: Seq[Long] = getSliceRange(value)
+
+ val averageLoad = getAverageLoad(value)
+
+ // let's pre-create the objects so that we dont need to create it again for each collect call
+ val sensorKeyObject = keyGetter(value.sensorEvent1)
+ val average = Average(averageLoad, 1)
+
+
+ slice_range.foreach (
+ slice_index => {
+ val averageWithKey = AverageWithKey(sensorKeyObject, Slice.from(size_in_seconds)(slice_index), average)
+ out.collect(averageWithKey)
+ }
+ )
+
+ }
+
+ def getSliceRange(value: TwoWorkEvents) = {
+ val slice_range_a = Slice(Time.seconds(size_in_seconds))(value.sensorEvent1.timestamp).i
+ val slice_range_b = Slice(Time.seconds(size_in_seconds))(value.sensorEvent2.timestamp).i
+
+ if(slice_range_a > slice_range_b){
+ slice_range_b to slice_range_a
+ } else {
+ slice_range_a to slice_range_b
+ }
+ }
+
+ def getAverageLoad(value: TwoWorkEvents) = {
+ //in kWH
+ val work_diff = Math.abs(value.sensorEvent1.value - value.sensorEvent2.value) * 1000
+
+ //in seconds
+ val time_diff = Math.abs(value.sensorEvent1.timestamp - value.sensorEvent2.timestamp)
+
+ val averageLoad = (work_diff.toDouble * 60 * 60) / time_diff
+
+ averageLoad
+ }
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageAggregateWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageAggregateWithKey.scala
deleted file mode 100644
index dbace07..0000000
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageAggregateWithKey.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.bhaskardivya.projects.smartgrid.operators
-
-import com.bhaskardivya.projects.smartgrid.base.AbstractKeyGetter
-import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, Constants, SensorEvent, SensorKeyObject}
-import org.apache.flink.api.common.functions.AggregateFunction
-
-/**
- * The accumulator is used to keep a running sum and a count. The [getResult] method
- * computes the average.
- */
-class AverageAggregateWithKey(keyGetter: AbstractKeyGetter) extends AggregateFunction[SensorEvent, AverageWithKey, AverageWithKey]{
-
- override def createAccumulator(): AverageWithKey = AverageWithKey(SensorKeyObject(-1), 0.0, 0L, Constants.KEY_NO_VALUE)
-
- override def add(value: SensorEvent, accumulator: AverageWithKey): AverageWithKey =
- AverageWithKey(
- keyGetter(value),
- accumulator.sum + value.value,
- accumulator.count + 1L,
- value.timestamp
- )
-
- override def getResult(accumulator: AverageWithKey): AverageWithKey = accumulator
-
- override def merge(a: AverageWithKey, b: AverageWithKey): AverageWithKey = a + b
-}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageWithKeyReducer.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageWithKeyReducer.scala
new file mode 100644
index 0000000..0430f3e
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageWithKeyReducer.scala
@@ -0,0 +1,11 @@
+package com.bhaskardivya.projects.smartgrid.operators
+
+import com.bhaskardivya.projects.smartgrid.model._
+import org.apache.flink.api.common.functions.ReduceFunction
+
+/**
+ * The reducer function for AverageWithKey object having same keys
+ */
+class AverageWithKeyReducer extends ReduceFunction[AverageWithKey]{
+ override def reduce(a: AverageWithKey, b: AverageWithKey): AverageWithKey = AverageWithKey.reducer(a,b)
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/EnrichMapper.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/EnrichMapper.scala
new file mode 100644
index 0000000..ae24aaa
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/EnrichMapper.scala
@@ -0,0 +1,59 @@
+package com.bhaskardivya.projects.smartgrid.operators
+
+import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoad, MedianLoadWithKey, Prediction}
+import com.tdunning.math.stats.TDigest
+import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichReduceFunction}
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.createTypeInformation
+import org.apache.flink.util.Collector
+
+/**
+ * Rich Mapper function to get the predicted load values from the median state and Average Loads
+ */
+class EnrichMapper(stateName: String) extends RichFlatMapFunction[AverageWithKey, Prediction]{
+
+ private var digest: MapState[Long, TDigest] = _
+ private var prediction2: Prediction = _
+
+ override def open(parameters: Configuration): Unit = {
+ val descriptor = new MapStateDescriptor[Long, TDigest](stateName, createTypeInformation[Long], createTypeInformation[TDigest])
+ digest = getRuntimeContext.getMapState(descriptor)
+ }
+
+ override def flatMap(value: AverageWithKey, out: Collector[Prediction]): Unit = {
+ // Get the TDigest Object for the SensorKey and slice predicting for
+ val currentDigest = digest.get(value.slice.predicting_for_time_of_day)
+
+ // Get the Median Load Value
+ val medianLoad =
+ if(currentDigest == null) {
+ val previousDigest = digest.get(value.slice.predicting_previous_slice)
+ if(previousDigest == null)
+ value.averageValue
+ else
+ previousDigest.quantile(0.5)
+ }else{
+ currentDigest.quantile(0.5)
+ }
+
+ // Calculate the load prediction
+ val prediction = (value.averageValue + medianLoad) / 2.0
+
+ // Update the Slice index for prediction
+ //TODO: Testing //value.slice = value.slice.predicting_for_slice
+
+ // Create the final Prediction Object to be collected
+ if (prediction2 == null) {
+ prediction2 = Prediction(value, MedianLoad(medianLoad), prediction)
+ } else {
+ prediction2.averageWithKey = value
+ prediction2.medianLoad = MedianLoad(medianLoad)
+ prediction2.predictedLoad = prediction
+ }
+
+ if(prediction > 1e-6) //
+ out.collect(prediction2)
+ }
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianAggregateWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianAggregateWithKey.scala
new file mode 100644
index 0000000..ebe47d3
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianAggregateWithKey.scala
@@ -0,0 +1,17 @@
+/*
+package com.bhaskardivya.projects.smartgrid.operators
+
+import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoadWithKey, SensorKeyObject}
+import com.tdunning.math.stats.TDigest
+import org.apache.flink.api.common.functions.AggregateFunction
+
+class MedianAggregateWithKey extends AggregateFunction[AverageWithKey, MedianLoadWithKey, MedianLoadWithKey]{
+ override def createAccumulator() = new MedianLoadWithKey(SensorKeyObject(-1), TDigest.createDigest(100))
+
+ override def add(value: AverageWithKey, accumulator: MedianLoadWithKey) = accumulator.add(averageWithKey = value)
+
+ override def getResult(accumulator: MedianLoadWithKey) = accumulator
+
+ override def merge(a: MedianLoadWithKey, b: MedianLoadWithKey) = a+b
+}
+*/
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianWithKeyMapper.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianWithKeyMapper.scala
new file mode 100644
index 0000000..5adbd02
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianWithKeyMapper.scala
@@ -0,0 +1,39 @@
+package com.bhaskardivya.projects.smartgrid.operators
+
+import com.bhaskardivya.projects.smartgrid.model._
+import com.tdunning.math.stats.TDigest
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+
+/**
+ * A Rich flatMap function to keep a running TDigest object for each key and each starting minutes of a day.
+ */
+class MedianWithKeyMapper(stateName: String) extends RichFlatMapFunction[AverageWithKey, AverageWithKey]{
+
+ private var digest: MapState[Long, TDigest] = _
+
+ override def open(parameters: Configuration): Unit = {
+ val descriptor = new MapStateDescriptor[Long, TDigest](stateName, createTypeInformation[Long], createTypeInformation[TDigest])
+ descriptor.setQueryable(stateName+"-query")
+ digest = getRuntimeContext.getMapState(descriptor)
+ }
+
+ override def flatMap(value: AverageWithKey, out: Collector[AverageWithKey]): Unit = {
+ val key = value.slice.start_time_of_day
+ var currentDigest = digest.get(key)
+
+ if(currentDigest == null){
+ currentDigest = TDigest.createDigest(Constants.TDIGEST_COMPRESSION)
+ }
+
+ if(!value.average.avg.isNaN) {
+ currentDigest.add(value.average.avg)
+ digest.put(key, currentDigest)
+ }
+
+ //out.collect(value) //Dont emit as no further processing is required on this stream
+ }
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala
index c19b537..072c023 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala
@@ -1,6 +1,7 @@
+/*
package com.bhaskardivya.projects.smartgrid.operators
-import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoad, Prediction}
+import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoad, MedianLoadWithKey, Prediction}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.windowing.time.Time
@@ -9,13 +10,14 @@ import org.apache.flink.streaming.api.windowing.time.Time
* @param entity The key for which prediction is made ie.House or Plug
* @param slidingWindow
*/
-class PredictionFunction(entity: String, slidingWindow: Long) extends MapFunction[(AverageWithKey, MedianLoad), Prediction]{
- override def map(value: (AverageWithKey, MedianLoad)): Prediction = {
+class PredictionFunction(entity: String, slidingWindow: Long) extends MapFunction[(AverageWithKey, MedianLoadWithKey), Prediction]{
+ override def map(value: (AverageWithKey, MedianLoadWithKey)): Prediction = {
val avg = value._1.averageValue
- val median = value._2.load
+ val median = value._2.medianLoad
val predictedValue: Double = (avg + median)/ 2.0
Prediction(value._1, value._2, entity, slidingWindow, predictedValue)
}
}
+*/
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/WorkValueProcessWindow.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/WorkValueProcessWindow.scala
new file mode 100644
index 0000000..92dd193
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/WorkValueProcessWindow.scala
@@ -0,0 +1,16 @@
+package com.bhaskardivya.projects.smartgrid.operators
+
+import com.bhaskardivya.projects.smartgrid.model.{SensorEvent, SensorKeyObject, TwoWorkEvents}
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.util.Collector
+
+class WorkValueProcessWindow extends ProcessWindowFunction[SensorEvent, TwoWorkEvents, SensorKeyObject, GlobalWindow] {
+ override def process(key: SensorKeyObject, context: Context, elements: Iterable[SensorEvent], out: Collector[TwoWorkEvents]): Unit = {
+
+ elements match {
+ case x: Iterable[SensorEvent] if x.size == 2 => out.collect(TwoWorkEvents(x.toList(0), x.toList(1)))
+ case _ => println("Encountered non-pair CountWindow Work")
+ }
+ }
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala
index 498c698..6d94286 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala
@@ -5,7 +5,7 @@ import org.apache.flink.streaming.api.scala._
object DataCleanser {
def clean(stream: DataStream[List[String]]): DataStream[List[String]] = {
stream
- .filter(_.length == 7) // There should be 7 fields in a record
+ .filter(_.lengthCompare(7) == 0) // There should be 7 fields in a record
.filter(row => row(3).toInt == 0 && row(3).toInt == 1) // 'property' can have either 0 or 1 value
}
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala
index 386be00..9c385b1 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala
@@ -1,7 +1,7 @@
package com.bhaskardivya.projects.smartgrid.pipeline
import com.bhaskardivya.projects.smartgrid.model.SensorEvent
-import com.bhaskardivya.projects.smartgrid.sources.KafkaSource
+import com.bhaskardivya.projects.smartgrid.sources.{FileSource, KafkaSource}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -14,8 +14,9 @@ object SourceChooser {
source match {
case "kafka" => new KafkaSource().getSource(env, params)
- /*case "file" => new FileSource().getSource(env, params)
- case _ => new FileSource().getSource(env, params)*/
+ case "file" => new FileSource().getSource(env, params)
+ case "simulated" => new FileSource().getSimulatedCSVSource(env, params)
+ case _ => new FileSource().getSource(env, params)
}
}
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSink.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSink.scala
similarity index 65%
rename from src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSink.scala
rename to src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSink.scala
index 29b9724..7b06aa3 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSink.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSink.scala
@@ -2,25 +2,27 @@ package com.bhaskardivya.projects.smartgrid.sinks
import java.net.{InetAddress, InetSocketAddress}
-import com.bhaskardivya.projects.smartgrid.model.SensorEvent
+import com.bhaskardivya.projects.smartgrid.util.JSONTrait
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
-object SensorEventElasticSearchSink {
+object ElasticSearchSink {
- def apply(params: ParameterTool, esIndex: String, esIndexType: String): ElasticsearchSink[SensorEvent] ={
+ def apply[T <: JSONTrait](params: ParameterTool, esIndex: String, esIndexType: String): ElasticsearchSink[T] ={
//Initialize Elastic search configuration
val esClusterLocationIP = params.get("es.cluster.ip", "192.168.99.100")
val esClusterLocationPort = params.getInt("es.cluster.port", 9300)
+ val esFlushMaxActions = params.getInt("bulk.flush.max.actions", 100)
+
val config = new java.util.HashMap[String, String]
config.put("cluster.name", params.get("es.cluster.name", "docker-cluster"))
// This instructs the sink to emit after every element, otherwise they would be buffered
- config.put("bulk.flush.max.actions", "1")
+ config.put("bulk.flush.max.actions", esFlushMaxActions.toString)
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName(esClusterLocationIP), esClusterLocationPort))
- new ElasticsearchSink(config, transportAddresses, new SensorEventElasticSearchSinkFunction(esIndex, esIndexType))
+ new ElasticsearchSink(config, transportAddresses, new ElasticSearchSinkFunction[T](esIndex, esIndexType))
}
-}
+}
\ No newline at end of file
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSinkFunction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSinkFunction.scala
similarity index 68%
rename from src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSinkFunction.scala
rename to src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSinkFunction.scala
index 70ffb92..d43a0d7 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSinkFunction.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSinkFunction.scala
@@ -1,6 +1,7 @@
package com.bhaskardivya.projects.smartgrid.sinks
import com.bhaskardivya.projects.smartgrid.model.Prediction
+import com.bhaskardivya.projects.smartgrid.util.JSONTrait
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.elasticsearch.action.ActionRequest
@@ -11,9 +12,9 @@ import org.elasticsearch.client.Requests
* @param esIndex ElasticSearch Index name
* @param esType ElasticSearch Index type
*/
-class PredictionElasticSearchSinkFunction(esIndex: String, esType: String) extends ElasticsearchSinkFunction[Prediction]{
+class ElasticSearchSinkFunction[T <: JSONTrait](esIndex: String, esType: String) extends ElasticsearchSinkFunction[T]{
- def createIndexRequest(element: Prediction): ActionRequest = {
+ def createIndexRequest(element: T): ActionRequest = {
val json = element.toJSONString()
Requests.indexRequest
@@ -22,7 +23,7 @@ class PredictionElasticSearchSinkFunction(esIndex: String, esType: String) exten
.source(json)
}
- override def process(element: Prediction, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
+ override def process(element: T, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala
index 09d529a..193ff10 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala
@@ -48,7 +48,7 @@ class HBaseOutputFormatAverageWithKey extends OutputFormat[AverageWithKey] {
@throws[IOException]
override def writeRecord(record: AverageWithKey): Unit = {
- val startTime = System.currentTimeMillis();
+ val startTime = System.currentTimeMillis()
// Make sure that the rowkey is sorted by the average values
val put = new Put(Bytes.toBytes(taskNumber + rowNumber) ++ record.bytesRowKey())
put.setDurability(Durability.SKIP_WAL)
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseSink.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseSink.scala
deleted file mode 100644
index 920f0aa..0000000
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseSink.scala
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.bhaskardivya.projects.smartgrid.sinks
-
-class HBaseSink[IN] {
- def getSink(): Unit ={
-
- }
-}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSink.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSink.scala
deleted file mode 100644
index 75f807b..0000000
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSink.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.bhaskardivya.projects.smartgrid.sinks
-
-import java.net.{InetAddress, InetSocketAddress}
-
-import com.bhaskardivya.projects.smartgrid.model.Prediction
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
-
-object PredictionElasticSearchSink {
-
- def apply(params: ParameterTool, esIndex: String, esIndexType: String): ElasticsearchSink[Prediction] ={
- //Initialize Elastic search configuration
- val esClusterLocationIP = params.get("es.cluster.ip", "192.168.99.100")
- val esClusterLocationPort = params.getInt("es.cluster.port", 9300)
- val config = new java.util.HashMap[String, String]
- config.put("cluster.name", params.get("es.cluster.name", "docker-cluster"))
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put("bulk.flush.max.actions", "1")
-
- val transportAddresses = new java.util.ArrayList[InetSocketAddress]
-/*
- transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), esClusterLocationPort))
- transportAddresses.add(new InetSocketAddress(InetAddress.getByName("172.17.0.2"), esClusterLocationPort))
- transportAddresses.add(new InetSocketAddress(InetAddress.getByName("0.0.0.0"), esClusterLocationPort))
- transportAddresses.add(new InetSocketAddress(InetAddress.getByName("localhost"), esClusterLocationPort))
-*/
- transportAddresses.add(new InetSocketAddress(InetAddress.getByName("192.168.99.100"), esClusterLocationPort))
- transportAddresses.add(new InetSocketAddress(InetAddress.getByName(esClusterLocationIP), esClusterLocationPort))
-
- new ElasticsearchSink(config, transportAddresses, new PredictionElasticSearchSinkFunction(esIndex, esIndexType))
- }
-
-}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSinkFunction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSinkFunction.scala
deleted file mode 100644
index aca041c..0000000
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSinkFunction.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.bhaskardivya.projects.smartgrid.sinks
-
-import com.bhaskardivya.projects.smartgrid.model.SensorEvent
-import org.apache.flink.api.common.functions.RuntimeContext
-import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
-import org.elasticsearch.action.ActionRequest
-import org.elasticsearch.client.Requests
-
-/**
- * Sink to ElasticSearch to store the prediction values
- * @param esIndex ElasticSearch Index name
- * @param esType ElasticSearch Index type
- */
-class SensorEventElasticSearchSinkFunction(esIndex: String, esType: String) extends ElasticsearchSinkFunction[SensorEvent]{
-
- def createIndexRequest(element: SensorEvent): ActionRequest = {
- val json = element.toJSONString()
-
- Requests.indexRequest
- .index(esIndex)
- .`type`(esType)
- .source(json)
- }
-
- override def process(element: SensorEvent, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
- indexer.add(createIndexRequest(element))
- }
-
-}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala
index 2938a47..e3c4140 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala
@@ -1,18 +1,32 @@
package com.bhaskardivya.projects.smartgrid.sources
-import org.apache.flink.api.java.io.TextInputFormat
+import com.bhaskardivya.projects.smartgrid.model.SensorEvent
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala._
class FileSource {
- def getSource(env: StreamExecutionEnvironment, params: ParameterTool): DataStream[String] = {
- val filePath: String = params.get("filePath", "/run/media/osboxes/Data/data/sorted100M.1s.hsum.csv")
- val fileInterval: Long = params.getLong("fileInterval", 100)
+ def getSimulatedCSVSource(env: StreamExecutionEnvironment, params: ParameterTool): DataStream[SensorEvent] = {
+ val data = params.get("input", "/data/data.gz")
+ val maxServingDelay = params.getInt("maxServingDelay", 0)
+ val servingSpeedFactor = params.getFloat("servingSpeedFactor", 1f)
+ val offsetEventTimestamp = params.has("offsetEventTimestamp")
- env.readFile(new TextInputFormat(new Path(filePath)), filePath, FileProcessingMode.PROCESS_ONCE, fileInterval)
+ val events = env.addSource(new CSVFileSource(data, maxServingDelay, servingSpeedFactor, offsetEventTimestamp))
+ .name("CSV GZ File")
+
+ events
+ }
+
+ def getSource(env: StreamExecutionEnvironment, params: ParameterTool): DataStream[SensorEvent] = {
+ val filePath: String = params.get("input", "/data/data.gz")
+
+ // read the CSV GZ and assign Timestamp
+ val csv: DataStream[SensorEvent] = env.readTextFile(filePath)
+ .map[SensorEvent](line => SensorEvent.fromString(line))
+ .assignTimestampsAndWatermarks(SensorEvent.tsAssigner())
+
+ csv
}
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala
index 2e90489..da25d4a 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala
@@ -28,23 +28,23 @@ object HBaseMedianSource extends Serializable{
val scan = new Scan()
if (avg != null && avg.key != null) {
println("HBaseMedianSource | getMedian | ColumnQualifier is set")
- scan.addColumn(columnFamily.getBytes(), avg.key.toColumnString().getBytes())
+ scan.addColumn(columnFamily.getBytes(), avg.key.toColumnString.getBytes)
}
else {
scan.addFamily(columnFamily.getBytes())
}
- println("HBaseMedianSource | getMedian | Fetching Median " + table + " | " + columnFamily + " | " + avg.key.toColumnString())
+ println("HBaseMedianSource | getMedian | Fetching Median " + table + " | " + columnFamily + " | " + avg.key.toColumnString)
//println("BD | " + conf.toString)
try {
val aggregationClient: AggregationClient = new AggregationClient(conf)
val median = aggregationClient.median(TableName.valueOf(table), new DoubleColumnInterpreter(), scan)
- println("HBaseMedianSource | getMedian | Median Fetched " + table + " | " + columnFamily + " | " + avg.key.toColumnString())
+ println("HBaseMedianSource | getMedian | Median Fetched " + table + " | " + columnFamily + " | " + avg.key.toColumnString)
if (median == null) return 0.0
median / 1000.0
}catch {
case e: Exception => println("Exception fetching median" + e)
}finally {
- println("GetMedian Took " + (System.currentTimeMillis() - startTime) + "ms for " + avg.key.toColumnString())
+ println("GetMedian Took " + (System.currentTimeMillis() - startTime) + "ms for " + avg.key.toColumnString)
}
0.0
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala
index 8daf230..ee2e9c4 100644
--- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala
@@ -19,6 +19,7 @@ class KafkaSource() {
val topic = params.get("topic", "test")
val server = params.get("bootstrap.servers", "localhost:9092")
val groupId = params.get("group.id", "test-" + System.currentTimeMillis().toString)
+ val startFromEarliest = params.getBoolean("start-from-earliest", true)
// Create properties map for Kafka
val properties = new Properties()
@@ -26,9 +27,15 @@ class KafkaSource() {
properties.setProperty("group.id", groupId)
val consumer = new FlinkKafkaConsumer011[SensorEvent](topic, SensorEvent.schema(env.getConfig), properties)
- consumer.setStartFromEarliest()
- LOG.info("Created Source from kafka - Topic: {}, Server: {}, Consumer Group: {}", topic, server, groupId);
+ // Always start from the earliest kafka offset
+ if(startFromEarliest)
+ consumer.setStartFromEarliest()
+
+ // Assign the Timestamp and Watermark from the SensorEvent's timestamp field
+ consumer.assignTimestampsAndWatermarks(SensorEvent.tsAssigner())
+
+ LOG.info("Created Source from kafka - Topic: {}, Server: {}, Consumer Group: {}", topic, server, groupId)
env.addSource(consumer)
}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/util/JSONTrait.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/JSONTrait.scala
new file mode 100644
index 0000000..1e09279
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/JSONTrait.scala
@@ -0,0 +1,7 @@
+package com.bhaskardivya.projects.smartgrid.util
+
+trait JSONTrait {
+
+ def toJSONString() : String
+
+}
diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/util/TDigestMedian.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/TDigestMedian.scala
new file mode 100644
index 0000000..2e09de1
--- /dev/null
+++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/TDigestMedian.scala
@@ -0,0 +1,21 @@
+package com.bhaskardivya.projects.smartgrid.util
+
+import com.tdunning.math.stats.TDigest
+import java.io.Serializable
+
+@SerialVersionUID(1L)
+class TDigestMedian() extends Serializable {
+ private var totalDigest: TDigest = _
+
+ this.setTotalDigest(TDigest.createDigest(100))
+
+ def getTotalDigest: TDigest = totalDigest
+
+ def setTotalDigest(totalDigest: TDigest) {
+ this.totalDigest = totalDigest
+ }
+
+ def addDigest(digest: Double): Unit = this.totalDigest.add(digest)
+
+ def getMedian: Double = this.totalDigest.quantile(0.5)
+}
\ No newline at end of file