diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..44ee543 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,55 @@ +# Java Maven CircleCI 2.0 configuration file +# +# Check https://circleci.com/docs/2.0/language-java/ for more details +# +# Continuous Integration! +version: 2 +jobs: + build: + docker: + # specify the version you desire here + - image: circleci/openjdk:8-jdk + + # Specify service dependencies here if necessary + # CircleCI maintains a library of pre-built images + # documented at https://circleci.com/docs/2.0/circleci-images/ + # - image: circleci/postgres:9.4 + + working_directory: ~/repo + + environment: + # Customize the JVM maximum heap limit + MAVEN_OPTS: -Xmx3200m + + steps: + - checkout + + # Download and cache dependencies + - restore_cache: + keys: + - v1-dependencies-{{ checksum "pom.xml" }} + # fallback to using the latest cache if no exact match is found + - v1-dependencies- + + - run: mvn package -P build-jar dependency:go-offline + + - save_cache: + paths: + - ~/.m2 + key: v1-dependencies-{{ checksum "pom.xml" }} + + # run tests! + #- run: mvn integration-test + + - run: | + set -xu + mkdir -p /tmp/artifacts + cp target/smartgrid*.jar /tmp/artifacts + + # Save artifacts + - store_artifacts: + path: /tmp/artifacts + destination: build + + + diff --git a/.idea/hydra.xml b/.idea/hydra.xml new file mode 100644 index 0000000..123e89c --- /dev/null +++ b/.idea/hydra.xml @@ -0,0 +1,9 @@ + + + + + \ No newline at end of file diff --git a/docker/Docker-compose.yml b/docker/Docker-compose.yml deleted file mode 100755 index e07fbd7..0000000 --- a/docker/Docker-compose.yml +++ /dev/null @@ -1,16 +0,0 @@ - -version: '2.1' -services: -#Flink + Kafka + Hbase + Zookeeper - flink-kafka-hbase: - image: - container -# Elasticsearch - elasticsearch: - -# Kibana - kibana: - -# Kafka-manager - kafka-manager: - \ No newline at end of file diff --git a/docker/Flink-Hbase-Kafka/docker-compose.yml b/docker/Flink-Hbase-Kafka/docker-compose.yml new file mode 100755 index 0000000..e97db32 --- /dev/null +++ b/docker/Flink-Hbase-Kafka/docker-compose.yml @@ -0,0 +1,29 @@ +version: '2.1' +services: + flink-kafka-hbase: + image: quay.io/koldbyte/flink-cluster + container_name: flink_to_hbase + hostname: hbase-docker + ports: + - 8080:8080 + - 8085:8085 + - 9090:9090 + - 9092:9092 + - 9095:9095 + - 2181:2181 + - 16000:16000 + - 16010:16010 + - 16020:16020 + - 16030:16030 + - 6123:6123 + - 7203:7203 + - 8081:8081 + - 8090:8090 + environment: + KAFKA_ADVERTISED_HOST_NAME: hbase-docker + volumes: + - /data + external_links: + - elasticsearch + - kibana + - kafka-manager diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100755 index 0000000..6b7b24a --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,67 @@ +version: '2.1' +services: + flink-kafka-hbase: + domainname: smartgrid.com + extends: + service: flink-kafka-hbase + file: ./Flink-Hbase-Kafka/docker-compose.yml + networks: + - smartgrid.com + volumes: + - /app/smartgrid/data:/data + links: + - elasticsearch + - elasticsearch2 + + elasticsearch: + domainname: smartgrid.com + extends: + service: elasticsearch + file: ./elasticsearch/docker-compose.yml + networks: + - smartgrid.com + volumes: + - /app/smartgrid/es-data:/usr/share/elasticsearch/data + + elasticsearch2: + domainname: smartgrid.com + extends: + service: elasticsearch2 + file: ./elasticsearch/docker-compose.yml + networks: + - smartgrid.com + volumes: + - /app/smartgrid/es-data-2:/usr/share/elasticsearch/data + + kibana: + domainname: smartgrid.com + extends: + service: kibana + file: ./kibana/docker-compose.yml + networks: + - smartgrid.com + depends_on: + - elasticsearch + links: + - elasticsearch + - elasticsearch2 + + kafka_manager: + domainname: smartgrid.com + extends: + service: kafka_manager + file: ./kafka-manager/docker-compose.yml + networks: + - smartgrid.com + volumes: + - /app/smartgrid/kafka-manager:/kafka-manager/configuration + depends_on: + - flink-kafka-hbase + links: + - flink-kafka-hbase + - "flink-kafka-hbase:hbase-docker" + +networks: + smartgrid.com: + driver: bridge + name: smartgrid.com \ No newline at end of file diff --git a/docker/elasticsearch/docker-compose.yml b/docker/elasticsearch/docker-compose.yml index 97f4364..0b6dbd7 100755 --- a/docker/elasticsearch/docker-compose.yml +++ b/docker/elasticsearch/docker-compose.yml @@ -1,54 +1,59 @@ -elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2 - ports: - - 9200:9200 - - 9300:9300 - container_name: elasticsearch - ulimits: - memlock: - soft: -1 - hard: -1 - mem_limit: 1g - environment: - - cluster.name=docker-cluster - - node.name=one - - bootstrap.memory_lock=false - - xpack.security.enabled=false - - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - - network.publish_host=192.168.99.100 - - transport.publish_port=9300 - - transport.host=0.0.0.0 - - transport.tcp.port=9300 - - network.host=0.0.0.0 - - http.host=0.0.0.0 - - http.port=9200 - volumes: - - /usr/share/elasticsearch/data +version: '2.1' +services: + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2 + hostname: elasticsearch + ports: + - 9200:9200 + - 9300:9300 + container_name: elasticsearch + ulimits: + memlock: + soft: -1 + hard: -1 + mem_limit: 1g + environment: + - cluster.name=docker-cluster + - node.name=one + - bootstrap.memory_lock=false + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - network.publish_host=elasticsearch + - transport.publish_port=9300 + - transport.host=0.0.0.0 + - transport.tcp.port=9300 + - network.host=0.0.0.0 + - http.host=0.0.0.0 + - http.port=9200 + volumes: + - /usr/share/elasticsearch/data -elasticsearch2: - image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2 - ports: - - 9201:9200 - - 9301:9300 - container_name: elasticsearch2 - ulimits: - memlock: - soft: -1 - hard: -1 - mem_limit: 1g - environment: - - cluster.name=docker-cluster - - node.name=two - - bootstrap.memory_lock=false - - xpack.security.enabled=false - - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - - network.publish_host=192.168.99.100 - - transport.publish_port=9301 - - "discovery.zen.ping.unicast.hosts=192.168.99.100" - - "discovery.zen.minimum_master_nodes=2" - - network.host=0.0.0.0 - - transport.host=0.0.0.0 - - transport.tcp.port=9300 - - http.host=0.0.0.0 - volumes: - - /usr/share/elasticsearch/data + elasticsearch2: + image: docker.elastic.co/elasticsearch/elasticsearch:5.1.2 + hostname: elasticsearch2 + ports: + - 9201:9200 + - 9301:9300 + container_name: elasticsearch2 + ulimits: + memlock: + soft: -1 + hard: -1 + mem_limit: 1g + environment: + - cluster.name=docker-cluster + - node.name=two + - bootstrap.memory_lock=false + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - network.publish_host=elasticsearch2 + - transport.publish_port=9300 + - "discovery.zen.ping.unicast.hosts=elasticsearch" + - "discovery.zen.minimum_master_nodes=2" + - network.host=0.0.0.0 + - transport.host=0.0.0.0 + - transport.tcp.port=9300 + - http.host=0.0.0.0 + - http.port=9200 + volumes: + - /usr/share/elasticsearch/data diff --git a/docker/kafka-manager/Docker-compose.yml b/docker/kafka-manager/docker-compose.yml similarity index 61% rename from docker/kafka-manager/Docker-compose.yml rename to docker/kafka-manager/docker-compose.yml index f6a0b69..982f33a 100755 --- a/docker/kafka-manager/Docker-compose.yml +++ b/docker/kafka-manager/docker-compose.yml @@ -1,9 +1,12 @@ -services: - kafka_manager: - image: hlebalbau/kafka-manager - ports: - - "9000:9000" - environment: - ZK_HOSTS: "zoo:2181" - APPLICATION_SECRET: "random-secret" +version: '2.1' +services: + kafka_manager: + image: hlebalbau/kafka-manager + hostname: kafka-manager + container_name: kafka-manager + ports: + - "9000:9000" + environment: + ZK_HOSTS: "hbase-docker:2181/kafka" + APPLICATION_SECRET: "random-secret" command: -Dpidfile.path=/dev/null \ No newline at end of file diff --git a/docker/kibana/Docker-compose.yml b/docker/kibana/docker-compose.yml similarity index 56% rename from docker/kibana/Docker-compose.yml rename to docker/kibana/docker-compose.yml index 7214ea9..5617be5 100755 --- a/docker/kibana/Docker-compose.yml +++ b/docker/kibana/docker-compose.yml @@ -1,11 +1,14 @@ -version: '2' -services: - kibana: - image: kibana:5.1.2 - container_name: kibana - restart: always - network_mode: "bridge" - ports: - - "5601:5601" - external_links: - - elasticsearch:elasticsearch +version: '2.1' +services: + kibana: + image: kibana:5.1.2 + hostname: kibana + container_name: kibana + restart: always + ports: + - "5601:5601" + environment: + - ELASTICSEARCH_URL=http://elasticsearch:9200 + - SERVER_NAME=elasticsearch + external_links: + - elasticsearch:elasticsearch diff --git a/pom.xml b/pom.xml index 2eaa215..9c5fe3d 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ under the License. 5.1.2 0.11.0.2 + 3.2 @@ -159,6 +160,11 @@ under the License. flink-connector-elasticsearch5_${scala.binary.version} ${flink.version} + + com.tdunning + t-digest + ${tdigest.version} + diff --git a/smartgrid.iml b/smartgrid.iml index e8f7a49..e75d68b 100644 --- a/smartgrid.iml +++ b/smartgrid.iml @@ -10,6 +10,26 @@ + + + + + + + + + + + + + + + + + + + + @@ -126,7 +146,6 @@ - @@ -150,6 +169,7 @@ + diff --git a/src/main/resources/META-INF/MANIFEST.MF b/src/main/resources/META-INF/MANIFEST.MF index f7055b0..c89a621 100644 --- a/src/main/resources/META-INF/MANIFEST.MF +++ b/src/main/resources/META-INF/MANIFEST.MF @@ -1,4 +1,3 @@ Manifest-Version: 1.0 -Main-Class: com.bhaskardivya.projects.smartgrid.SmartGridProcessorFrom - File +Main-Class: com.bhaskardivya.projects.smartgrid.job.PlugAveragingJob diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob.scala deleted file mode 100644 index 7d6a64f..0000000 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob.scala +++ /dev/null @@ -1,98 +0,0 @@ -package com.bhaskardivya.projects.smartgrid.archived - -import com.bhaskardivya.projects.smartgrid.base.AbstractKeyGetter -import com.bhaskardivya.projects.smartgrid.model._ -import com.bhaskardivya.projects.smartgrid.operators.AverageAggregateWithKey -import com.bhaskardivya.projects.smartgrid.pipeline._ -import com.bhaskardivya.projects.smartgrid.sinks.HBaseOutputFormatAverageWithKey -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.core.fs.FileSystem -import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows -import org.apache.flink.streaming.api.windowing.time.Time - -object SensorEventHouseAveragingJob { - def main(args: Array[String]): Unit = { - // parse parameters - val params = ParameterTool.fromArgs(args) - - // Initialise the environment for flink - val env = StreamExecutionEnvironment.getExecutionEnvironment - - // will be using the timestamp from the records - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - // Get the stream according to params - val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Parsed Data") - - val withTimestamps = stream - .assignTimestampsAndWatermarks(SensorEvent.tsAssigner()) - .name("Kafka Source with TS") - - val windowed1min = withTimestamps - .keyBy(keyGetter(_)) - .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregateWithKey(keyGetter)) - .name("Average for 1 min Window") - - val windowed5min = windowed1min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name("Average for 5 min Window") - - val windowed15min = windowed5min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name("Average for 15 min Window") - - val windowed60min = windowed15min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name("Average for 60 min Window") - - val windowed120min = windowed60min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name("Average for 120 min Window") - - val debug = params.has("debug") - if(debug){ - windowed1min.writeAsCsv("/data/output.windowed1min.csv", FileSystem.WriteMode.OVERWRITE) - windowed5min.writeAsCsv("/data/output.windowed5min.csv", FileSystem.WriteMode.OVERWRITE) - windowed15min.writeAsCsv("/data/output.windowed15min.csv", FileSystem.WriteMode.OVERWRITE) - windowed60min.writeAsCsv("/data/output.windowed60min.csv", FileSystem.WriteMode.OVERWRITE) - windowed120min.writeAsCsv("/data/output.windowed120min.csv", FileSystem.WriteMode.OVERWRITE) - } - - windowed1min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_1MIN, Constants.HOUSE_CF))) - .name("House - 1 Min Window") - - windowed5min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_5MIN, Constants.HOUSE_CF))) - .name("House - 5 Min Window") - - windowed15min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_15MIN, Constants.HOUSE_CF))) - .name("House - 15 Min Window") - - windowed60min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_60MIN, Constants.HOUSE_CF))) - .name("House - 60 Min Window") - - windowed120min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_120MIN, Constants.HOUSE_CF))) - .name("House - 120 Min Window") - - env.execute("Sensor Event House Averaging Job (Kafka to HBase Averages) ") - - } - - object keyGetter extends AbstractKeyGetter{ - def apply(element: SensorEvent): SensorKeyObject = { - SensorKeyObject(element.house_id) - } - } - -} \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob2.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob2.scala deleted file mode 100644 index 506667d..0000000 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventHouseAveragingJob2.scala +++ /dev/null @@ -1,79 +0,0 @@ -package com.bhaskardivya.projects.smartgrid.archived - -import com.bhaskardivya.projects.smartgrid.model.{Constants, SensorEvent} -import com.bhaskardivya.projects.smartgrid.operators.AverageAggregate -import com.bhaskardivya.projects.smartgrid.pipeline._ -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.core.fs.FileSystem -import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows -import org.apache.flink.streaming.api.windowing.time.Time - -object SensorEventHouseAveragingJob2 { - def main(args: Array[String]): Unit = { - // parse parameters - val params = ParameterTool.fromArgs(args) - - // Initialise the environment for flink - val env = StreamExecutionEnvironment.getExecutionEnvironment - - // will be using the timestamp from the records - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - // Get the stream according to params - val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Raw") - - val withTimestampsKeyed = stream - //id, timestamp, value, property, plug_id, household_id, house_id - //.map(x => SensorEvent(x(0).toLong, x(1).toLong, x(2).toDouble, x(3).toInt, x(4).toLong, x(5).toLong, x(6).toLong)) - .assignTimestampsAndWatermarks(SensorEvent.tsAssigner()) - .name("Kafka Source with TS") - .keyBy(_.house_id) - - val windowed1min = withTimestampsKeyed - .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregate())//, new AverageWindowFunction()) - .name("Average for 1 min Window") - - windowed1min.writeAsCsv("/data/window_1min_out.csv", FileSystem.WriteMode.OVERWRITE) - - val windowed5min = withTimestampsKeyed - .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregate())//, new AverageWindowFunction()) - .name("Average for 5 min Window") - - val windowed15min = withTimestampsKeyed - .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregate())//, new AverageWindowFunction()) - .name("Average for 15 min Window") - - val windowed60min = withTimestampsKeyed - .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregate())//, new AverageWindowFunction()) - .name("Average for 60 min Window") - - val windowed120min = withTimestampsKeyed - .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregate())//, new AverageWindowFunction()) - .name("Average for 120 min Window") - - /* windowed1min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_1MIN, Constants.HOUSE_CF))) - .name("House - 1 Min Window") - - windowed5min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_5MIN, Constants.HOUSE_CF))) - .name("House - 5 Min Window") - - windowed15min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_15MIN, Constants.HOUSE_CF))) - .name("House - 15 Min Window") - - windowed60min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_60MIN, Constants.HOUSE_CF))) - .name("House - 60 Min Window") - - windowed120min.addSink(new OutputFormatSinkFunction[(Long, Average)](new HBaseOutputFormat().of(Constants.TABLE_120MIN, Constants.HOUSE_CF))) - .name("House - 120 Min Window")*/ - - env.execute("Sensor Event House Averaging Job (Kafka to HBase Averages) ") - - } -} \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventPlugAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventPlugAveragingJob.scala deleted file mode 100644 index 74caacb..0000000 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/archived/SensorEventPlugAveragingJob.scala +++ /dev/null @@ -1,90 +0,0 @@ -package com.bhaskardivya.projects.smartgrid.archived - -import com.bhaskardivya.projects.smartgrid.model.{Average, Constants, SensorEvent} -import com.bhaskardivya.projects.smartgrid.operators.{AverageAggregate, AverageWindowFunction} -import com.bhaskardivya.projects.smartgrid.pipeline._ -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows -import org.apache.flink.streaming.api.windowing.time.Time - -object SensorEventPlugAveragingJob { - def main(args: Array[String]): Unit = { - // parse parameters - val params = ParameterTool.fromArgs(args) - - // Initialise the environment for flink - val env = StreamExecutionEnvironment.getExecutionEnvironment - - // will be using the timestamp from the records - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - // Get the stream according to params - val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Raw") - - //no need to add timestamps as kafka source already has it - //val withTimestamps = stream - //id, timestamp, value, property, plug_id, household_id, house_id - //.map(x => SensorEvent(x(0).toLong, x(1).toLong, x(2).toDouble, x(3).toInt, x(4).toLong, x(5).toLong, x(6).toLong)) - //.assignTimestampsAndWatermarks(new PunctuatedAssigner()) - - val windowed1min = stream - .keyBy(_.plug_id) - .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregate(), new AverageWindowFunction()) - .name("Average for 1 min Window") - - val windowed5min = windowed1min - .keyBy(_._1) - .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce( - (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2) - ) - .name("Average for 5 min Window") - - val windowed15min = windowed5min - .keyBy(_._1) - .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce( - (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2) - ) - .name("Average for 15 min Window") - - val windowed60min = windowed15min - .keyBy(_._1) - .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce( - (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2) - ) - .name("Average for 60 min Window") - - val windowed120min = windowed60min - .keyBy(_._1) - .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce( - (a: (Long,Average), b: (Long, Average)) => (a._1, a._2+b._2) - ) - .name("Average for 120 min Window") -/* - - windowed1min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_1MIN, Constants.HOUSE_CF)) - .name("Plug - 1 Min Window") - - windowed5min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_5MIN, Constants.HOUSE_CF)) - .name("Plug - 5 Min Window") - - windowed15min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_15MIN, Constants.HOUSE_CF)) - .name("Plug - 15 Min Window") - - windowed60min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_60MIN, Constants.HOUSE_CF)) - .name("Plug - 60 Min Window") - - windowed120min.writeUsingOutputFormat(new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_120MIN, Constants.HOUSE_CF)) - .name("Plug - 120 Min Window") -*/ - - env.execute("Sensor Event Plug Averaging Job (Kafka to HBase Averages) ") - - } -} \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/base/PredictionJobSingleWindowBase.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/PredictionJobSingleWindowBase.scala new file mode 100644 index 0000000..f41399c --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/PredictionJobSingleWindowBase.scala @@ -0,0 +1,188 @@ +package com.bhaskardivya.projects.smartgrid.base + +import java.io.File + +import com.bhaskardivya.projects.smartgrid.model._ +import com.bhaskardivya.projects.smartgrid.operators._ +import com.bhaskardivya.projects.smartgrid.pipeline._ +import com.bhaskardivya.projects.smartgrid.sinks.ElasticSearchSink +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala.{DataStream, _} +import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows} +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * Abstract class that represents the Main Job that calculates the + * averages and medians which are being used here itself to predict + * the load forecast + */ +abstract class PredictionJobSingleWindowBase extends Serializable { + + //register implicits for types + implicit val typeInfoAverageWithKey: TypeInformation[AverageWithKey] = TypeInformation.of(classOf[AverageWithKey]) + implicit val typeInfoPrediction2: TypeInformation[Prediction] = TypeInformation.of(classOf[Prediction]) + implicit val typeInfoTime: TypeInformation[Time] = TypeInformation.of(classOf[Time]) + + private val LOG_DIR = "/data/" + getKeyName() + + /** + * Method that returns the name of the key in the source datum + * + * @return String key name + */ + def getKeyName(): String + + /** + * Method that returns the value of the key in the source datum + * + * @param element SensorEvent record + * @return Long Key value + */ + def getKey(element: SensorEvent): SensorKeyObject + + object keyGetter extends AbstractKeyGetter { + def apply(element: SensorEvent): SensorKeyObject = { + getKey(element) + } + } + + /** + * Method to prepare the raw events properly aggregated(sum) based on the key + * @param dataStream source raw data stream + * @return + */ + def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] + + // Sliding window durations in minutes + val windowDurations = List(5) + + def main(args: Array[String]): Unit = { + + //Create the log dirs + try { + new File(LOG_DIR).mkdir() + new File(LOG_DIR + "/state/").mkdir() + new File(LOG_DIR + "/input/").mkdir() + new File(LOG_DIR + "/output_avg/").mkdir() + new File(LOG_DIR + "/output_prediction/").mkdir() + } catch { + case e: Exception => println("Directories already created" + e.getMessage) + } + + // parse parameters + val params = ParameterTool.fromArgs(args) + + // Initialise the environment for flink + val env = StreamExecutionEnvironment.getExecutionEnvironment + + // will be using the timestamp from the records + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + //env.enableCheckpointing(10000) // checkpoint every 10000 msecs + //env.setStateBackend(new FsStateBackend(LOG_DIR +"/state/")) + + // Get the stream according to params + val rawStream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Source with Timestamp") + + // Create a stream with sum according to the key specified + val initializedFlow = if (params.has("deduplicate")) initializeFlow(rawStream) else rawStream + + if(params.getBoolean("sink.raw", false)) + initializedFlow + .addSink(ElasticSearchSink[SensorEvent](params, Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_RAW)) + .name("Sensor Raw to ES") + + // Create a Global Window for work values which will output the missing load values + val averageUsingWorkValues: DataStream[AverageWithKey] = rawStream + .filter(_.property == Constants.PROPERTY_WORK) + .keyBy(e => getKey(e)) + .countWindow(2, 1) + .process(new WorkValueProcessWindow) + .flatMap(new WorkValueFlatMap(60)(keyGetter)) //60 seconds + + val averageWithKeys = initializedFlow + .map(e => AverageWithKey(keyGetter(e), Slice(Time.minutes(1))(e.timestamp), Average(e.value, 1))) + // Assumption: If both the Load values and work Values are available in a slice, + // Average of them will still be closer to the real measurement + .union(averageUsingWorkValues) + + //create median states for various window duration + createMedianState(averageWithKeys) + + // Create the average sliding window stream and corresponding Prediction Stream + val windowed_average_5min = createAverageStream(params, 5, averageWithKeys) + if(params.getBoolean("sink.5min", false)) { + val prediction_5min = createPredictionStream(params, 5, windowed_average_5min) + createPredictionSink(params, prediction_5min, Constants.ES_INDEX_TYPE_5MIN) + } + + env.execute("Sensor Event" + getKeyName() + " Prediction Job") + } + + /** + * Helper function to create median MapState for each window duration + * @param averageWithKeyStream DataStream of AverageWithKey + */ + def createMedianState(averageWithKeyStream: DataStream[AverageWithKey]): Unit = { + + // Streams for each window duration for the average + windowDurations.foreach(duration => { + + val stateName = getStateName(duration) + + val avg_windowed = averageWithKeyStream + .keyBy(_.key) + .window(TumblingEventTimeWindows.of(Time.minutes(duration))) + .reduce(new AverageWithKeyReducer) + .name(getKeyName() + " Average for " + duration + " min Tumbling Window") + + // Store median as operator state + avg_windowed + .keyBy(_.key) + .flatMap(new MedianWithKeyMapper(stateName)) + .name(getKeyName() + " Median state for " + duration + " min Tumbling Window") + + }) + } + + def createAverageStream(params: ParameterTool, duration: Int, sourceStream: DataStream[AverageWithKey]): DataStream[AverageWithKey] = { + + val slidingInterval = params.getInt("sliding.interval", Constants.SLIDING_INTERVAL) + + val windowed_average = sourceStream + .keyBy(_.key) + // Map the Correct Slice duration + .map(e => AverageWithKey(e.key, Slice(Time.minutes(duration))(e.slice.timestamp), e.average)) + .keyBy(_.key) + .window(SlidingEventTimeWindows.of(Time.minutes(duration), Time.seconds(slidingInterval))) + .reduce(new AverageWithKeyReducer) + + windowed_average + } + + def createPredictionStream(params: ParameterTool, duration: Int, averageStream: DataStream[AverageWithKey]): DataStream[Prediction] = { + val windowed_prediction: DataStream[Prediction] = averageStream + .keyBy(_.key) + .flatMap(new EnrichMapper(getStateName(duration))) + .name(getKeyName() + " Prediction values for " + duration + " min") + + windowed_prediction + } + + def createPredictionSink(params: ParameterTool, windowed_prediction: DataStream[Prediction], indexType: String) = { + // Sink the Predicted value streams to Elasticsearch + windowed_prediction + .addSink(ElasticSearchSink[Prediction](params, Constants.ES_INDEX_NAME, indexType)) + .name(getKeyName() + " Prediction Sink - ES - " + indexType + " min Window") + + // Write to file for debug + if(params.has("debug")){ + windowed_prediction.writeAsText(LOG_DIR + "/output_prediction/windowed"+ indexType +".csv", FileSystem.WriteMode.OVERWRITE) + } + } + + def getStateName(duration: Int): String = "median-" + duration + "min" +} \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala index a45104b..1277850 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/base/SensorEventAveragingJobBase.scala @@ -1,18 +1,18 @@ package com.bhaskardivya.projects.smartgrid.base import java.io.File -import java.util.concurrent.TimeUnit import com.bhaskardivya.projects.smartgrid.model._ -import com.bhaskardivya.projects.smartgrid.operators.{AverageAggregateWithKey, HBaseAsyncFunction, PredictionFunction} +import com.bhaskardivya.projects.smartgrid.operators._ import com.bhaskardivya.projects.smartgrid.pipeline._ -import com.bhaskardivya.projects.smartgrid.sinks.{HBaseOutputFormatAverageWithKey, PredictionElasticSearchSink, SensorEventElasticSearchSink} +import com.bhaskardivya.projects.smartgrid.sinks.ElasticSearchSink +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem +import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows +import org.apache.flink.streaming.api.scala.{DataStream, _} +import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time /** @@ -22,6 +22,11 @@ import org.apache.flink.streaming.api.windowing.time.Time */ abstract class SensorEventAveragingJobBase extends Serializable { + //register implicits for types + implicit val typeInfoAverageWithKey: TypeInformation[AverageWithKey] = TypeInformation.of(classOf[AverageWithKey]) + implicit val typeInfoPrediction2: TypeInformation[Prediction] = TypeInformation.of(classOf[Prediction]) + implicit val typeInfoTime: TypeInformation[Time] = TypeInformation.of(classOf[Time]) + private val LOG_DIR = "/data/" + getKeyName() /** @@ -33,7 +38,7 @@ abstract class SensorEventAveragingJobBase extends Serializable { /** * Method to prepare the raw events properly aggregated(sum) based on the key - * @param dataStream + * @param dataStream source raw data stream * @return */ def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] @@ -41,8 +46,8 @@ abstract class SensorEventAveragingJobBase extends Serializable { /** * Method that returns the value of the key in the source datum * - * @param element - * @return Long Key value + * @param element SensorEvent record + * @return Long Key value */ def getKey(element: SensorEvent): SensorKeyObject @@ -58,16 +63,20 @@ abstract class SensorEventAveragingJobBase extends Serializable { } } + // Sliding window durations in minutes + val windowDurations = List(1, 5, 15, 60, 120) + def main(args: Array[String]): Unit = { + //Create the log dirs try { new File(LOG_DIR).mkdir() + new File(LOG_DIR + "/state/").mkdir() new File(LOG_DIR + "/input/").mkdir() new File(LOG_DIR + "/output_avg/").mkdir() - new File(LOG_DIR + "/output_enriched/").mkdir() new File(LOG_DIR + "/output_prediction/").mkdir() } catch { - case e: Exception => println("Directories already created") + case e: Exception => println("Directories already created" + e.getMessage) } // parse parameters @@ -79,187 +88,132 @@ abstract class SensorEventAveragingJobBase extends Serializable { // will be using the timestamp from the records env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + //env.enableCheckpointing(10000) // checkpoint every 10000 msecs + //env.setStateBackend(new FsStateBackend(LOG_DIR +"/state/")) + // Get the stream according to params - val stream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Source") + val rawStream: DataStream[SensorEvent] = SourceChooser.from(env, params).name("Sensor Source with Timestamp") - val withTimestamps = stream - .assignTimestampsAndWatermarks(SensorEvent.tsAssigner()) - .name("Source with Timestamp") + // Create a Global Window for work values which will output the missing load values + val averageUsingWorkValues: DataStream[AverageWithKey] = rawStream + .filter(_.property == Constants.PROPERTY_WORK) + .keyBy(e => getKey(e)) + .countWindow(2, 1) + .process(new WorkValueProcessWindow) + .flatMap(new WorkValueFlatMap(60)(keyGetter)) //60 seconds // Create a stream with sum according to the key specified - val initializedFlow = initializeFlow(withTimestamps) + val initializedFlow = if (params.has("deduplicate")) initializeFlow(rawStream) else rawStream + + val averageWithKeys = initializedFlow + .map(e => AverageWithKey(keyGetter(e), Slice(Time.minutes(1))(e.timestamp), Average(e.value, 1))) + // Assumption: If both the Load values and work Values are available in a slice, + // Average of them will still be closer to the real measurement + .union(averageUsingWorkValues) + + //create median states for various window duration + createMedianState(averageWithKeys) + + // Create the average sliding window stream and corresponding Prediction Stream + val windowed_average_1min = createAverageStream(params, 1, averageWithKeys) + if (params.getBoolean("sink.1min", false)) { + val prediction_1min = createPredictionStream(params, 1, windowed_average_1min) + createPredictionSink(params, prediction_1min, Constants.ES_INDEX_TYPE_1MIN) + } - // Streams for each window duration for the average - val avg_windowed1min = initializedFlow - .keyBy(keyGetter(_)) - .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(Constants.SLIDING_INTERVAL))) - .aggregate(new AverageAggregateWithKey(keyGetter)) - .name(getKeyName() + "Average for 1 min Window") + val windowed_average_5min = createAverageStream(params, 5, windowed_average_1min) + if(params.getBoolean("sink.5min", false)) { + val prediction_5min = createPredictionStream(params, 5, windowed_average_5min) + createPredictionSink(params, prediction_5min, Constants.ES_INDEX_TYPE_5MIN) + } - val avg_windowed5min = avg_windowed1min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name(getKeyName() + "Average for 5 min Window") + val windowed_average_15min = createAverageStream(params, 15, windowed_average_5min) + if(params.getBoolean("sink.15min", false)) { + val prediction_15min = createPredictionStream(params, 15, windowed_average_15min) + createPredictionSink(params, prediction_15min, Constants.ES_INDEX_TYPE_15MIN) + } - val avg_windowed15min = avg_windowed5min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name(getKeyName() + "Average for 15 min Window") + val windowed_average_60min = createAverageStream(params, 60, windowed_average_15min) + if(params.getBoolean("sink.60min", false)) { + val prediction_60min = createPredictionStream(params, 60, windowed_average_60min) + createPredictionSink(params, prediction_60min, Constants.ES_INDEX_TYPE_60MIN) + } - val avg_windowed60min = avg_windowed15min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name(getKeyName() + "Average for 60 min Window") + val windowed_average_120min = createAverageStream(params, 120, windowed_average_60min) + if(params.getBoolean("sink.120min", false)) { + val prediction_120min = createPredictionStream(params, 120, windowed_average_120min) + createPredictionSink(params, prediction_120min, Constants.ES_INDEX_TYPE_120MIN) + } - val avg_windowed120min = avg_windowed60min - .keyBy(_.key) - .window(SlidingEventTimeWindows.of(Time.minutes(120), Time.seconds(Constants.SLIDING_INTERVAL))) - .reduce(AverageWithKey.reducer) - .name(getKeyName() + "Average for 120 min Window") + if(params.getBoolean("sink.raw", false)) + initializedFlow + .addSink(ElasticSearchSink[SensorEvent](params, Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_RAW)) + .name("Sensor Raw to ES") - // Write to file for debug - val debug = params.has("debug") - if (debug) { - initializedFlow.writeAsCsv(LOG_DIR + "/input/initializedFlow.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Initialized Flow") - avg_windowed1min.writeAsCsv(LOG_DIR + "/output_avg/windowed1min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 1 Min") - avg_windowed5min.writeAsCsv(LOG_DIR + "/output_avg/windowed5min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 5 Min") - avg_windowed15min.writeAsCsv(LOG_DIR + "/output_avg/windowed15min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 15 Min") - avg_windowed60min.writeAsCsv(LOG_DIR + "/output_avg/windowed60min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 60 Min") - avg_windowed120min.writeAsCsv(LOG_DIR + "/output_avg/windowed120min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Avg windowed 120 Min") - } + env.execute("Sensor Event" + getKeyName() + " Prediction Job") + } - // Write to HBase for each window duration - avg_windowed1min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_1MIN, getTargetColumnFamily()))) - .name(getKeyName() + " Average - HBase - 1 Min Window") - - avg_windowed5min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_5MIN, getTargetColumnFamily()))) - .name(getKeyName() + " Average - HBase - 5 Min Window") - - avg_windowed15min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_15MIN, getTargetColumnFamily()))) - .name(getKeyName() + " Average - HBase - 15 Min Window") - - avg_windowed60min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_60MIN, getTargetColumnFamily()))) - .name(getKeyName() + " Average - HBase - 60 Min Window") - - avg_windowed120min.addSink(new OutputFormatSinkFunction[AverageWithKey](new HBaseOutputFormatAverageWithKey().of(Constants.TABLE_120MIN, getTargetColumnFamily()))) - .name(getKeyName() + " Average - HBase - 120 Min Window") - - val timeout = params.getLong("timeout", 3000000L) // 300 seconds timeout - - // Enrich the average calculation with the median value for each stream of different window duration - val windowed1min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait( - avg_windowed1min, - new HBaseAsyncFunction(Constants.TABLE_1MIN, getTargetColumnFamily()), - timeout, - TimeUnit.MILLISECONDS, - 1 - ) - .startNewChain() - .name(getKeyName() + " Enriched - 1 Min Window") - - val windowed5min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait( - avg_windowed5min, - new HBaseAsyncFunction(Constants.TABLE_5MIN, getTargetColumnFamily()), - timeout, - TimeUnit.MILLISECONDS, - 1 - ) - .startNewChain() - .name(getKeyName() + " Enriched - 5 Min Window") - - val windowed15min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait( - avg_windowed15min, - new HBaseAsyncFunction(Constants.TABLE_15MIN, getTargetColumnFamily()), - timeout, - TimeUnit.MILLISECONDS, - 1 - ) - .startNewChain() - .name(getKeyName() + " Enriched - 15 Min Window") - - val windowed60min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait( - avg_windowed60min, - new HBaseAsyncFunction(Constants.TABLE_60MIN, getTargetColumnFamily()), - timeout, - TimeUnit.MILLISECONDS, - 1 - ) - .startNewChain() - .name(getKeyName() + " Enriched - 60 Min Window") - - val windowed120min_enriched: DataStream[(AverageWithKey, MedianLoad)] = AsyncDataStream.orderedWait( - avg_windowed120min, - new HBaseAsyncFunction(Constants.TABLE_120MIN, getTargetColumnFamily()), - timeout, - TimeUnit.MILLISECONDS, - 1 - ) - .startNewChain() - .name(getKeyName() + " Enriched - 120 Min Window") - - if (debug) { - windowed1min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed1min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 1 Min Window") - windowed5min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed5min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 5 Min Window") - windowed15min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed15min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 15 Min Window") - windowed60min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed60min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 60 Min Window") - windowed120min_enriched.writeAsCsv(LOG_DIR + "/output_enriched/windowed120min.csv", FileSystem.WriteMode.OVERWRITE).name("Debug Enriched 120 Min Window") - } + /** + * Helper function to create median MapState for each window duration + * @param averageWithKeyStream DataStream of AverageWithKey + */ + def createMedianState(averageWithKeyStream: DataStream[AverageWithKey]): Unit = { - // Create the predicted value streams - val windowed1min_prediction = windowed1min_enriched - .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(1).toMilliseconds)) + // Streams for each window duration for the average + windowDurations.foreach(duration => { - val windowed5min_prediction = windowed5min_enriched - .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(5).toMilliseconds)) + val stateName = getStateName(duration) - val windowed15min_prediction = windowed15min_enriched - .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(15).toMilliseconds)) + val avg_windowed = averageWithKeyStream + .keyBy(_.key) + .window(TumblingEventTimeWindows.of(Time.minutes(duration))) + .reduce(new AverageWithKeyReducer) + .name(getKeyName() + " Average for " + duration + " min Tumbling Window") - val windowed60min_prediction = windowed60min_enriched - .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(60).toMilliseconds)) + // Store median as operator state + avg_windowed + .keyBy(_.key) + .flatMap(new MedianWithKeyMapper(stateName)) + .name(getKeyName() + " Median state for " + duration + " min Tumbling Window") - val windowed120min_prediction = windowed120min_enriched - .map(new PredictionFunction(entity = getKeyName(), slidingWindow = Time.minutes(120).toMilliseconds)) + }) + } - // Sink the Predicted value streams to Elasticsearch - windowed1min_prediction - .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_1MIN)) - .name(getKeyName() + " Prediction Sink - ES - 1 min Window") - - windowed5min_prediction - .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_5MIN)) - .name(getKeyName() + " Prediction Sink - ES - 5 min Window") - - windowed15min_prediction - .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_15MIN)) - .name(getKeyName() + " Prediction Sink - ES - 15 min Window") - - windowed60min_prediction - .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_60MIN)) - .name(getKeyName() + " Prediction Sink - ES - 60 min Window") - - windowed120min_prediction - .addSink(PredictionElasticSearchSink(params,Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_120MIN)) - .name(getKeyName() + " Prediction Sink - ES - 120 min Window") - - if (debug) { - windowed1min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed1min.csv", FileSystem.WriteMode.OVERWRITE) - windowed5min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed5min.csv", FileSystem.WriteMode.OVERWRITE) - windowed15min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed15min.csv", FileSystem.WriteMode.OVERWRITE) - windowed60min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed60min.csv", FileSystem.WriteMode.OVERWRITE) - windowed120min_prediction.writeAsCsv(LOG_DIR + "/output_prediction/windowed120min.csv", FileSystem.WriteMode.OVERWRITE) - } + def createAverageStream(params: ParameterTool, duration: Int, sourceStream: DataStream[AverageWithKey]): DataStream[AverageWithKey] = { + + val slidingInterval = params.getInt("sliding.interval", Constants.SLIDING_INTERVAL) + + val windowed_average = sourceStream + .keyBy(_.key) + // Map the Correct Slice duration + .map(e => AverageWithKey(e.key, Slice(Time.minutes(duration))(e.slice.timestamp), e.average)) + .keyBy(_.key) + .window(SlidingEventTimeWindows.of(Time.minutes(duration), Time.seconds(slidingInterval))) + .reduce(new AverageWithKeyReducer) - //Sink the original feed into ES as well - initializedFlow - .addSink(SensorEventElasticSearchSink(params, Constants.ES_INDEX_NAME, Constants.ES_INDEX_TYPE_RAW)) - .name("Sensor Raw to ES") + windowed_average + } - env.execute("Sensor Event" + getKeyName() + " Prediction Job (Kafka to HBase Averages + Prediction to ES)") + def createPredictionStream(params: ParameterTool, duration: Int, averageStream: DataStream[AverageWithKey]): DataStream[Prediction] = { + val windowed_prediction: DataStream[Prediction] = averageStream + .keyBy(_.key) + .flatMap(new EnrichMapper(getStateName(duration))) + .name(getKeyName() + " Prediction values for " + duration + " min") + + windowed_prediction + } + def createPredictionSink(params: ParameterTool, windowed_prediction: DataStream[Prediction], indexType: String) = { + // Sink the Predicted value streams to Elasticsearch + windowed_prediction + .addSink(ElasticSearchSink[Prediction](params, Constants.ES_INDEX_NAME, indexType)) + .name(getKeyName() + " Prediction Sink - ES - " + indexType + " min Window") + + // Write to file for debug + if(params.has("debug")){ + windowed_prediction.writeAsText(LOG_DIR + "/output_prediction/windowed"+ indexType +".csv", FileSystem.WriteMode.OVERWRITE) + } } + def getStateName(duration: Int): String = "median-" + duration + "min" } \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala index 9254f60..f2a83cf 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/HouseAveragingJob.scala @@ -12,7 +12,7 @@ object HouseAveragingJob extends SensorEventAveragingJobBase with Serializable{ override def getTargetColumnFamily(): String = Constants.HOUSE_CF - override def initializeFlow(dataStream: DataStream[SensorEvent]) = { + override def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] = { // Sum the values of all the plugs in a house with the same time stamp dataStream .keyBy("house_id", "timestamp") diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala index 189ba10..8e7671e 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob.scala @@ -12,11 +12,12 @@ object PlugAveragingJob extends SensorEventAveragingJobBase with Serializable{ override def getTargetColumnFamily(): String = Constants.PLUG_CF - override def initializeFlow(dataStream: DataStream[SensorEvent]) = { - // Sum all the multiple values with the same timestamp for a given plug of a given house + override def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] = { + // De-duplicate values with the same timestamp for a given plug of a given house dataStream - .keyBy("house_id", "plug_id", "timestamp") - .sum("value") - .name("Aggregated by Plug Data") + .filter(_.property == Constants.PROPERTY_LOAD) + .keyBy("house_id", "household_id" , "plug_id", "timestamp") + .reduce((_,b) => b) + .name("De-duplicated Raw stream") } } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob5Min.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob5Min.scala new file mode 100644 index 0000000..378715c --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/job/PlugAveragingJob5Min.scala @@ -0,0 +1,21 @@ +package com.bhaskardivya.projects.smartgrid.job + +import com.bhaskardivya.projects.smartgrid.base.PredictionJobSingleWindowBase +import com.bhaskardivya.projects.smartgrid.model.{Constants, SensorEvent, SensorKeyObject} +import org.apache.flink.streaming.api.scala.DataStream + +object PlugAveragingJob5Min extends PredictionJobSingleWindowBase with Serializable{ + + override def getKeyName(): String = "Plug" + + override def getKey(element: SensorEvent): SensorKeyObject = SensorKeyObject(element.house_id, element.household_id, element.plug_id) + + override def initializeFlow(dataStream: DataStream[SensorEvent]): DataStream[SensorEvent] = { + // De-duplicate values with the same timestamp for a given plug of a given house + dataStream + .filter(_.property == Constants.PROPERTY_LOAD) + .keyBy("house_id", "household_id" , "plug_id", "timestamp") + .reduce((_,b) => b) + .name("De-duplicated Raw stream") + } +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala index 43edfa9..4c38f65 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Average.scala @@ -1,5 +1,12 @@ package com.bhaskardivya.projects.smartgrid.model +import org.apache.flink.streaming.api.scala._ + +/** + * Class to hold the sum and count + * @param sum Aggregation of load values + * @param count Number of load values + */ case class Average(var sum: Double, var count: Long){ def add(that: Average): Average = { Average(this.sum + that.sum, this.count + that.count) @@ -8,4 +15,8 @@ case class Average(var sum: Double, var count: Long){ def +(that: Average): Average = { this.add(that) } + + def avg: Double = { + sum/count + } } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala index 154f69e..d3bcef7 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/AverageWithKey.scala @@ -1,27 +1,28 @@ package com.bhaskardivya.projects.smartgrid.model import com.gotometrics.orderly.DoubleWritableRowKey +import org.apache.flink.streaming.api.windowing.time.Time import org.apache.hadoop.io.DoubleWritable +import org.apache.flink.streaming.api.scala._ -case class AverageWithKey(var key: SensorKeyObject, var sum: Double, var count: Long, eventTimestamp: Long = Constants.KEY_NO_VALUE){ - val averageValue: Double = sum / count +case class AverageWithKey(var key: SensorKeyObject, var slice: Slice, average: Average){ + def averageValue = average.avg def add(that: AverageWithKey): AverageWithKey = { - // To ensure that we do no propagate "-1" (Constants.KEY_NO_VALUE) (initial accumulator value) - if(this.key.house_id > that.key.house_id) - AverageWithKey(this.key, this.sum + that.sum, this.count + that.count, Math.max(this.eventTimestamp, that.eventTimestamp)) - else - AverageWithKey(that.key, this.sum + that.sum, this.count + that.count, Math.max(this.eventTimestamp, that.eventTimestamp)) + AverageWithKey(this.key, this.slice, this.average + that.average) } def +(that: AverageWithKey): AverageWithKey = { this.add(that) } + /* HBase related functions start */ + @deprecated def toHBaseColumnName(): String = { - key.toColumnString() + key.toColumnString } + @deprecated def bytesRowKey(): Array[Byte] = { // Original Object that will be serialized val rowkeyVal: DoubleWritable = new DoubleWritable(toHBaseColumnValue().asInstanceOf[java.lang.Double]) @@ -34,17 +35,19 @@ case class AverageWithKey(var key: SensorKeyObject, var sum: Double, var count: * Otherwise, DoubleColumnInterpreter will not be able to read column value * @return */ + @deprecated def toHBaseColumnValue(): Double = { //this.sum + Constants.DELIMITER + this.count averageValue } + @deprecated def toHBaseLongColumnValue(): Long = { //this.sum + Constants.DELIMITER + this.count (averageValue * 1000).toLong } - def fromHBaseColumnValue(column: String, value: String): AverageWithKey = { + /*def fromHBaseColumnValue(column: String, value: String): AverageWithKey = { //val fields = value.split(Constants.DELIMITER) //AverageWithKey(column, fields(0).toDouble, fields(2).toLong) var sumValue = 0.0 @@ -54,11 +57,15 @@ case class AverageWithKey(var key: SensorKeyObject, var sum: Double, var count: case e: Exception => sumValue = 0.0 } AverageWithKey(SensorKeyObject.fromColumnString(column), sumValue, 1, Constants.KEY_NO_VALUE) - } + }*/ } object AverageWithKey { def reducer = { (a: AverageWithKey, b: AverageWithKey) => a+b } + + def getInitialValue(): AverageWithKey = { + new AverageWithKey(SensorKeyObject(-1), Slice(Time.milliseconds(-1))(-1), Average(0.0, 0) ) + } } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala index 31e3755..4a36a72 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Constants.scala @@ -29,4 +29,13 @@ object Constants { // Sensor Key Object val KEY_NO_VALUE: Long = -1 + val PROPERTY_LOAD: Int = 1 + val PROPERTY_WORK: Int = 0 + + // TDigest + val TDIGEST_COMPRESSION = 100 + + // This is the base timestamp ... the epoch of the world with flink + val BASE_TIMESTAMP = 1377900000L // TODO: currently set 0 to align with Unix Epoch + } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/MedianLoadWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/MedianLoadWithKey.scala new file mode 100644 index 0000000..953820b --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/MedianLoadWithKey.scala @@ -0,0 +1,32 @@ +package com.bhaskardivya.projects.smartgrid.model + +import com.tdunning.math.stats.TDigest + +case class MedianLoadWithKey(var key: SensorKeyObject, slice: Slice, digest: TDigest){ + def medianLoad : Double = digest.quantile(0.5) + + def add(averageWithKey: AverageWithKey) = { + this.digest.add(averageWithKey.averageValue) + this + } + + def add(other: MedianLoadWithKey) = { + this.digest.add(other.digest) + this + } + + def +(other: MedianLoadWithKey) = add(other) +} + +object MedianLoadWithKey { + def reducer = { + (a: MedianLoadWithKey, b: MedianLoadWithKey) => a+b + } + + def fromAverageWithKey(averageWithKey: AverageWithKey) = { + val tDigest: TDigest = TDigest.createDigest(Constants.TDIGEST_COMPRESSION) + tDigest.add(averageWithKey.averageValue) + + MedianLoadWithKey(averageWithKey.key, averageWithKey.slice, tDigest) + } +} \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala index cf49348..9cddc00 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Prediction.scala @@ -1,14 +1,12 @@ package com.bhaskardivya.projects.smartgrid.model -import java.util.Date - +import com.bhaskardivya.projects.smartgrid.util.JSONTrait import org.apache.sling.commons.json.JSONObject - -case class Prediction(averageWithKey: AverageWithKey, medianLoad: MedianLoad, key: String, slidingWindow: Long, predictedValue: Double){ +case class Prediction(var averageWithKey: AverageWithKey, var medianLoad: MedianLoad, var predictedLoad: Double) extends JSONTrait { def toJSONString(): String = { - toJSON().toString() + toJSON2().toString() } def toJSON(): JSONObject = { @@ -17,29 +15,73 @@ case class Prediction(averageWithKey: AverageWithKey, medianLoad: MedianLoad, ke //averageWithKey object val averageWithKeyJSON = averageWithKey.key.toJSON - averageWithKeyJSON.put("sum", averageWithKey.sum) - averageWithKeyJSON.put("count", averageWithKey.count) - averageWithKeyJSON.put("avg", averageWithKey.averageValue) - averageWithKeyJSON.put("eventTimestamp", averageWithKey.eventTimestamp) + averageWithKeyJSON.put("sum", normalise_double(averageWithKey.average.sum)) + averageWithKeyJSON.put("count", averageWithKey.average.count) + averageWithKeyJSON.put("avg", normalise_double(averageWithKey.averageValue)) + averageWithKeyJSON.put("eventTimestamp", averageWithKey.slice.timestamp) json.put("averageWithKey", averageWithKeyJSON) - //mediaLoad + //medianLoad val medianLoadJSON = new JSONObject() - medianLoadJSON.put("load", medianLoad.load) + medianLoadJSON.put("load", normalise_double(medianLoad.load)) json.put("medianLoad", medianLoadJSON) //key or entity - json.put("key", key) + json.put("house_id", averageWithKey.key.house_id) + json.put("household_id", averageWithKey.key.household_id) + json.put("plug_id", averageWithKey.key.plug_id) + + //sliding window duration + json.put("slidingWindowDuration", averageWithKey.slice.size.toMilliseconds) + json.put("slice-start", averageWithKey.slice.ts_start) + json.put("slice-stop", averageWithKey.slice.ts_stop) + + //Predicted value + json.put("predictedValue", normalise_double(predictedLoad)) + + // Current Time + json.put("current-timestamp", System.currentTimeMillis) + + json + } + + def normalise_double(dbl: Double): Double = { + if(dbl < 1e-6) { + 0.000001 + }else{ + dbl + } + } + + def toJSON2(): JSONObject = { + //main object + val json = new JSONObject() + + //averageWithKey object + val averageWithKeyJSON = averageWithKey.key.toJSON + averageWithKeyJSON.put("sum", normalise_double(averageWithKey.average.sum)) + averageWithKeyJSON.put("count", averageWithKey.average.count) + averageWithKeyJSON.put("avg", normalise_double(averageWithKey.averageValue)) + averageWithKeyJSON.put("eventTimestamp", averageWithKey.slice.timestamp) + json.put("averageWithKey", averageWithKeyJSON) + + //medianLoad + val medianLoadJSON = new JSONObject() + medianLoadJSON.put("load", normalise_double(medianLoad.load)) + json.put("medianLoad", medianLoadJSON) //sliding window duration - json.put("slidingWindowDuration", slidingWindow) + json.put("slidingWindowDuration", averageWithKey.slice.size.toMilliseconds) + json.put("slice-start", averageWithKey.slice.ts_start) + json.put("slice-stop", averageWithKey.slice.ts_stop) //Predicted value - json.put("predictedValue", predictedValue) + json.put("predictedValue", normalise_double(predictedLoad)) + json.put("predicted-slice-start", averageWithKey.slice.predicting_for_slice.ts_start) // Current Time - json.put("timestamp", new Date().getTime) + json.put("current-timestamp", System.currentTimeMillis) json } -} \ No newline at end of file +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala index bcc7fab..b839a40 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorEvent.scala @@ -1,5 +1,6 @@ package com.bhaskardivya.projects.smartgrid.model +import com.bhaskardivya.projects.smartgrid.util.JSONTrait import org.apache.commons.lang3.builder.ToStringBuilder import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema @@ -13,9 +14,9 @@ import org.apache.sling.commons.json.JSONObject /** * id, timestamp, value, property, plug_id, household_id, house_id */ -case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var property: Int, var plug_id: Long, var household_id: Long, var house_id: Long){ +case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var property: Int, var plug_id: Long, var household_id: Long, var house_id: Long) extends JSONTrait { - def adjustEventTimestamp(millis: Long) = { + def adjustEventTimestamp(millis: Long): Unit = { this.timestamp = this.timestamp + (millis / 1000) } @@ -30,7 +31,7 @@ case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var val json: JSONObject = new JSONObject() json.put("id", this.id) json.put("timestamp", this.timestamp) - json.put("value", this.value) + json.put("value", normalise_double(this.value)) json.put("property", this.property) json.put("plug_id", this.plug_id) json.put("household_id", this.household_id) @@ -38,6 +39,14 @@ case class SensorEvent(var id: Long,var timestamp: Long, var value: Double, var json } + + def normalise_double(dbl: Double): Double = { + if(dbl < 1e-6) { + 0.000001 + }else{ + dbl + } + } } object SensorEvent { @@ -56,7 +65,7 @@ object SensorEvent { new SensorEvent(id, timestamp, value, property, plug_id, household_id, house_id) } catch { - case e: Exception => null + case _ : Throwable => null } } @@ -69,7 +78,7 @@ object SensorEvent { def tsAssigner(): BoundedOutOfOrdernessTimestampExtractor[SensorEvent] = { new BoundedOutOfOrdernessTimestampExtractor[SensorEvent](MAX_DELAY) { - override def extractTimestamp(element: SensorEvent) = element.getTimeMillis() + override def extractTimestamp(element: SensorEvent): Long = element.getTimeMillis() } } } \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala index 1fbce6d..ffed820 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/SensorKeyObject.scala @@ -1,12 +1,13 @@ package com.bhaskardivya.projects.smartgrid.model import org.apache.sling.commons.json.JSONObject +import org.apache.flink.streaming.api.scala._ class SensorKeyObject(var house_id: Long, var household_id: Long, var plug_id: Long) extends Serializable{ - override def toString: String = this.toColumnString() + override def toString: String = this.toColumnString - def toColumnString(): String = { + def toColumnString: String = { val str: StringBuilder = new StringBuilder if(this.house_id > Constants.KEY_NO_VALUE) @@ -23,11 +24,9 @@ class SensorKeyObject(var house_id: Long, var household_id: Long, var plug_id: L str.toString() } - def toJSONString(): String = { - toJSON().toString - } + def toJSONString: String = toJSON.toString - def toJSON(): JSONObject = { + def toJSON: JSONObject = { val json: JSONObject = new JSONObject() if(this.house_id > Constants.KEY_NO_VALUE) diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Slice.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Slice.scala new file mode 100644 index 0000000..ef991bd --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/Slice.scala @@ -0,0 +1,71 @@ +package com.bhaskardivya.projects.smartgrid.model + +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.scala._ + +import scala.collection.immutable + +/** + * Class to represent a slice index of a size (aka window size) + * + * @param size Window size + * @param timestamp Event timestamp from the record in seconds + */ +case class Slice(size: Time)(var timestamp: Long) { + private val seconds_in_Day = 24 * 60 * 60 + + + def size_in_seconds: Long = size.toMilliseconds / 1000 + + def ts_start: Long = timestamp - (timestamp % size_in_seconds) + + def ts_stop: Long = ts_start + size_in_seconds - 1 + + def i: Long = ( ts_start - Constants.BASE_TIMESTAMP ) / size_in_seconds + + def num_slices_in(hr: Long): Long = (hr * 60 * 60) / size_in_seconds + + def num_slices_in_day: Long = num_slices_in(24) + + def j: immutable.IndexedSeq[Long] = { + val k = num_slices_in_day + (1L to ((i+2)/k)).map(n => (i+2 - n*k)) + } + + def start_time_of_day: Long = ts_start % seconds_in_Day + def stop_time_of_day: Long = ts_stop % seconds_in_Day + + def predicting_for_time_of_day: Long = (start_time_of_day + 2*size_in_seconds) % seconds_in_Day + def predicting_previous_slice: Long = (start_time_of_day - 2*size_in_seconds + seconds_in_Day) % seconds_in_Day + def predicting_for_slice: Slice = Slice(size)(ts_start + 2*size_in_seconds) + + override def toString : String = { + val str: StringBuilder = new StringBuilder + + str.append(size.toMilliseconds.toString) + str.append(Constants.DELIMITER) + str.append(ts_start.toString) + str.append(Constants.DELIMITER) + str.append(ts_stop.toString) + + str.toString() + } +} + +object Slice { + def from(size: Time)(sliceIndex: Long): Slice = { + val size_in_seconds = size.toMilliseconds / 1000 + + val ts_start = sliceIndex * size_in_seconds + Constants.BASE_TIMESTAMP + + //lets keep the event occurring just right in the middle of the slice index + val event_timestamp = ts_start + (size_in_seconds / 2) + + Slice(size)(event_timestamp) + } + + def from(size_in_seconds: Long)(sliceIndex: Long) : Slice = { + from(Time.seconds(size_in_seconds))(sliceIndex) + } + +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/TwoWorkEvents.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/TwoWorkEvents.scala new file mode 100644 index 0000000..456cbb2 --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/TwoWorkEvents.scala @@ -0,0 +1,5 @@ +package com.bhaskardivya.projects.smartgrid.model + +case class TwoWorkEvents(sensorEvent1: SensorEvent, sensorEvent2: SensorEvent) { + def isValid: Boolean = sensorEvent1 != null && sensorEvent2 != null +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/model/WorkValueFlatMap.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/WorkValueFlatMap.scala new file mode 100644 index 0000000..bcae404 --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/model/WorkValueFlatMap.scala @@ -0,0 +1,58 @@ +package com.bhaskardivya.projects.smartgrid.model + +import com.bhaskardivya.projects.smartgrid.base.AbstractKeyGetter +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.util.Collector + +class WorkValueFlatMap(size_in_seconds: Long)(keyGetter: AbstractKeyGetter) extends FlatMapFunction[TwoWorkEvents, AverageWithKey]{ + override def flatMap(value: TwoWorkEvents, out: Collector[AverageWithKey]): Unit = { + + if(!value.isValid) + return + + // TwoWorkEvents have records with the same key + // Find the slices between the two WorkEvents + // Generate AverageWithKey records for each of the slices + + val slice_range: Seq[Long] = getSliceRange(value) + + val averageLoad = getAverageLoad(value) + + // let's pre-create the objects so that we dont need to create it again for each collect call + val sensorKeyObject = keyGetter(value.sensorEvent1) + val average = Average(averageLoad, 1) + + + slice_range.foreach ( + slice_index => { + val averageWithKey = AverageWithKey(sensorKeyObject, Slice.from(size_in_seconds)(slice_index), average) + out.collect(averageWithKey) + } + ) + + } + + def getSliceRange(value: TwoWorkEvents) = { + val slice_range_a = Slice(Time.seconds(size_in_seconds))(value.sensorEvent1.timestamp).i + val slice_range_b = Slice(Time.seconds(size_in_seconds))(value.sensorEvent2.timestamp).i + + if(slice_range_a > slice_range_b){ + slice_range_b to slice_range_a + } else { + slice_range_a to slice_range_b + } + } + + def getAverageLoad(value: TwoWorkEvents) = { + //in kWH + val work_diff = Math.abs(value.sensorEvent1.value - value.sensorEvent2.value) * 1000 + + //in seconds + val time_diff = Math.abs(value.sensorEvent1.timestamp - value.sensorEvent2.timestamp) + + val averageLoad = (work_diff.toDouble * 60 * 60) / time_diff + + averageLoad + } +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageAggregateWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageAggregateWithKey.scala deleted file mode 100644 index dbace07..0000000 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageAggregateWithKey.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.bhaskardivya.projects.smartgrid.operators - -import com.bhaskardivya.projects.smartgrid.base.AbstractKeyGetter -import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, Constants, SensorEvent, SensorKeyObject} -import org.apache.flink.api.common.functions.AggregateFunction - -/** - * The accumulator is used to keep a running sum and a count. The [getResult] method - * computes the average. - */ -class AverageAggregateWithKey(keyGetter: AbstractKeyGetter) extends AggregateFunction[SensorEvent, AverageWithKey, AverageWithKey]{ - - override def createAccumulator(): AverageWithKey = AverageWithKey(SensorKeyObject(-1), 0.0, 0L, Constants.KEY_NO_VALUE) - - override def add(value: SensorEvent, accumulator: AverageWithKey): AverageWithKey = - AverageWithKey( - keyGetter(value), - accumulator.sum + value.value, - accumulator.count + 1L, - value.timestamp - ) - - override def getResult(accumulator: AverageWithKey): AverageWithKey = accumulator - - override def merge(a: AverageWithKey, b: AverageWithKey): AverageWithKey = a + b -} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageWithKeyReducer.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageWithKeyReducer.scala new file mode 100644 index 0000000..0430f3e --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/AverageWithKeyReducer.scala @@ -0,0 +1,11 @@ +package com.bhaskardivya.projects.smartgrid.operators + +import com.bhaskardivya.projects.smartgrid.model._ +import org.apache.flink.api.common.functions.ReduceFunction + +/** + * The reducer function for AverageWithKey object having same keys + */ +class AverageWithKeyReducer extends ReduceFunction[AverageWithKey]{ + override def reduce(a: AverageWithKey, b: AverageWithKey): AverageWithKey = AverageWithKey.reducer(a,b) +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/EnrichMapper.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/EnrichMapper.scala new file mode 100644 index 0000000..ae24aaa --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/EnrichMapper.scala @@ -0,0 +1,59 @@ +package com.bhaskardivya.projects.smartgrid.operators + +import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoad, MedianLoadWithKey, Prediction} +import com.tdunning.math.stats.TDigest +import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichReduceFunction} +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala.createTypeInformation +import org.apache.flink.util.Collector + +/** + * Rich Mapper function to get the predicted load values from the median state and Average Loads + */ +class EnrichMapper(stateName: String) extends RichFlatMapFunction[AverageWithKey, Prediction]{ + + private var digest: MapState[Long, TDigest] = _ + private var prediction2: Prediction = _ + + override def open(parameters: Configuration): Unit = { + val descriptor = new MapStateDescriptor[Long, TDigest](stateName, createTypeInformation[Long], createTypeInformation[TDigest]) + digest = getRuntimeContext.getMapState(descriptor) + } + + override def flatMap(value: AverageWithKey, out: Collector[Prediction]): Unit = { + // Get the TDigest Object for the SensorKey and slice predicting for + val currentDigest = digest.get(value.slice.predicting_for_time_of_day) + + // Get the Median Load Value + val medianLoad = + if(currentDigest == null) { + val previousDigest = digest.get(value.slice.predicting_previous_slice) + if(previousDigest == null) + value.averageValue + else + previousDigest.quantile(0.5) + }else{ + currentDigest.quantile(0.5) + } + + // Calculate the load prediction + val prediction = (value.averageValue + medianLoad) / 2.0 + + // Update the Slice index for prediction + //TODO: Testing //value.slice = value.slice.predicting_for_slice + + // Create the final Prediction Object to be collected + if (prediction2 == null) { + prediction2 = Prediction(value, MedianLoad(medianLoad), prediction) + } else { + prediction2.averageWithKey = value + prediction2.medianLoad = MedianLoad(medianLoad) + prediction2.predictedLoad = prediction + } + + if(prediction > 1e-6) // + out.collect(prediction2) + } +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianAggregateWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianAggregateWithKey.scala new file mode 100644 index 0000000..ebe47d3 --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianAggregateWithKey.scala @@ -0,0 +1,17 @@ +/* +package com.bhaskardivya.projects.smartgrid.operators + +import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoadWithKey, SensorKeyObject} +import com.tdunning.math.stats.TDigest +import org.apache.flink.api.common.functions.AggregateFunction + +class MedianAggregateWithKey extends AggregateFunction[AverageWithKey, MedianLoadWithKey, MedianLoadWithKey]{ + override def createAccumulator() = new MedianLoadWithKey(SensorKeyObject(-1), TDigest.createDigest(100)) + + override def add(value: AverageWithKey, accumulator: MedianLoadWithKey) = accumulator.add(averageWithKey = value) + + override def getResult(accumulator: MedianLoadWithKey) = accumulator + + override def merge(a: MedianLoadWithKey, b: MedianLoadWithKey) = a+b +} +*/ diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianWithKeyMapper.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianWithKeyMapper.scala new file mode 100644 index 0000000..5adbd02 --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/MedianWithKeyMapper.scala @@ -0,0 +1,39 @@ +package com.bhaskardivya.projects.smartgrid.operators + +import com.bhaskardivya.projects.smartgrid.model._ +import com.tdunning.math.stats.TDigest +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.util.Collector + +/** + * A Rich flatMap function to keep a running TDigest object for each key and each starting minutes of a day. + */ +class MedianWithKeyMapper(stateName: String) extends RichFlatMapFunction[AverageWithKey, AverageWithKey]{ + + private var digest: MapState[Long, TDigest] = _ + + override def open(parameters: Configuration): Unit = { + val descriptor = new MapStateDescriptor[Long, TDigest](stateName, createTypeInformation[Long], createTypeInformation[TDigest]) + descriptor.setQueryable(stateName+"-query") + digest = getRuntimeContext.getMapState(descriptor) + } + + override def flatMap(value: AverageWithKey, out: Collector[AverageWithKey]): Unit = { + val key = value.slice.start_time_of_day + var currentDigest = digest.get(key) + + if(currentDigest == null){ + currentDigest = TDigest.createDigest(Constants.TDIGEST_COMPRESSION) + } + + if(!value.average.avg.isNaN) { + currentDigest.add(value.average.avg) + digest.put(key, currentDigest) + } + + //out.collect(value) //Dont emit as no further processing is required on this stream + } +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala index c19b537..072c023 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/PredictionFunction.scala @@ -1,6 +1,7 @@ +/* package com.bhaskardivya.projects.smartgrid.operators -import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoad, Prediction} +import com.bhaskardivya.projects.smartgrid.model.{AverageWithKey, MedianLoad, MedianLoadWithKey, Prediction} import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.windowing.time.Time @@ -9,13 +10,14 @@ import org.apache.flink.streaming.api.windowing.time.Time * @param entity The key for which prediction is made ie.House or Plug * @param slidingWindow */ -class PredictionFunction(entity: String, slidingWindow: Long) extends MapFunction[(AverageWithKey, MedianLoad), Prediction]{ - override def map(value: (AverageWithKey, MedianLoad)): Prediction = { +class PredictionFunction(entity: String, slidingWindow: Long) extends MapFunction[(AverageWithKey, MedianLoadWithKey), Prediction]{ + override def map(value: (AverageWithKey, MedianLoadWithKey)): Prediction = { val avg = value._1.averageValue - val median = value._2.load + val median = value._2.medianLoad val predictedValue: Double = (avg + median)/ 2.0 Prediction(value._1, value._2, entity, slidingWindow, predictedValue) } } +*/ diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/WorkValueProcessWindow.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/WorkValueProcessWindow.scala new file mode 100644 index 0000000..92dd193 --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/operators/WorkValueProcessWindow.scala @@ -0,0 +1,16 @@ +package com.bhaskardivya.projects.smartgrid.operators + +import com.bhaskardivya.projects.smartgrid.model.{SensorEvent, SensorKeyObject, TwoWorkEvents} +import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.util.Collector + +class WorkValueProcessWindow extends ProcessWindowFunction[SensorEvent, TwoWorkEvents, SensorKeyObject, GlobalWindow] { + override def process(key: SensorKeyObject, context: Context, elements: Iterable[SensorEvent], out: Collector[TwoWorkEvents]): Unit = { + + elements match { + case x: Iterable[SensorEvent] if x.size == 2 => out.collect(TwoWorkEvents(x.toList(0), x.toList(1))) + case _ => println("Encountered non-pair CountWindow Work") + } + } +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala index 498c698..6d94286 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/DataCleanser.scala @@ -5,7 +5,7 @@ import org.apache.flink.streaming.api.scala._ object DataCleanser { def clean(stream: DataStream[List[String]]): DataStream[List[String]] = { stream - .filter(_.length == 7) // There should be 7 fields in a record + .filter(_.lengthCompare(7) == 0) // There should be 7 fields in a record .filter(row => row(3).toInt == 0 && row(3).toInt == 1) // 'property' can have either 0 or 1 value } } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala index 386be00..9c385b1 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/pipeline/SourceChooser.scala @@ -1,7 +1,7 @@ package com.bhaskardivya.projects.smartgrid.pipeline import com.bhaskardivya.projects.smartgrid.model.SensorEvent -import com.bhaskardivya.projects.smartgrid.sources.KafkaSource +import com.bhaskardivya.projects.smartgrid.sources.{FileSource, KafkaSource} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} @@ -14,8 +14,9 @@ object SourceChooser { source match { case "kafka" => new KafkaSource().getSource(env, params) - /*case "file" => new FileSource().getSource(env, params) - case _ => new FileSource().getSource(env, params)*/ + case "file" => new FileSource().getSource(env, params) + case "simulated" => new FileSource().getSimulatedCSVSource(env, params) + case _ => new FileSource().getSource(env, params) } } } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSink.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSink.scala similarity index 65% rename from src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSink.scala rename to src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSink.scala index 29b9724..7b06aa3 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSink.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSink.scala @@ -2,25 +2,27 @@ package com.bhaskardivya.projects.smartgrid.sinks import java.net.{InetAddress, InetSocketAddress} -import com.bhaskardivya.projects.smartgrid.model.SensorEvent +import com.bhaskardivya.projects.smartgrid.util.JSONTrait import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink -object SensorEventElasticSearchSink { +object ElasticSearchSink { - def apply(params: ParameterTool, esIndex: String, esIndexType: String): ElasticsearchSink[SensorEvent] ={ + def apply[T <: JSONTrait](params: ParameterTool, esIndex: String, esIndexType: String): ElasticsearchSink[T] ={ //Initialize Elastic search configuration val esClusterLocationIP = params.get("es.cluster.ip", "192.168.99.100") val esClusterLocationPort = params.getInt("es.cluster.port", 9300) + val esFlushMaxActions = params.getInt("bulk.flush.max.actions", 100) + val config = new java.util.HashMap[String, String] config.put("cluster.name", params.get("es.cluster.name", "docker-cluster")) // This instructs the sink to emit after every element, otherwise they would be buffered - config.put("bulk.flush.max.actions", "1") + config.put("bulk.flush.max.actions", esFlushMaxActions.toString) val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName(esClusterLocationIP), esClusterLocationPort)) - new ElasticsearchSink(config, transportAddresses, new SensorEventElasticSearchSinkFunction(esIndex, esIndexType)) + new ElasticsearchSink(config, transportAddresses, new ElasticSearchSinkFunction[T](esIndex, esIndexType)) } -} +} \ No newline at end of file diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSinkFunction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSinkFunction.scala similarity index 68% rename from src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSinkFunction.scala rename to src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSinkFunction.scala index 70ffb92..d43a0d7 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSinkFunction.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/ElasticSearchSinkFunction.scala @@ -1,6 +1,7 @@ package com.bhaskardivya.projects.smartgrid.sinks import com.bhaskardivya.projects.smartgrid.model.Prediction +import com.bhaskardivya.projects.smartgrid.util.JSONTrait import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.elasticsearch.action.ActionRequest @@ -11,9 +12,9 @@ import org.elasticsearch.client.Requests * @param esIndex ElasticSearch Index name * @param esType ElasticSearch Index type */ -class PredictionElasticSearchSinkFunction(esIndex: String, esType: String) extends ElasticsearchSinkFunction[Prediction]{ +class ElasticSearchSinkFunction[T <: JSONTrait](esIndex: String, esType: String) extends ElasticsearchSinkFunction[T]{ - def createIndexRequest(element: Prediction): ActionRequest = { + def createIndexRequest(element: T): ActionRequest = { val json = element.toJSONString() Requests.indexRequest @@ -22,7 +23,7 @@ class PredictionElasticSearchSinkFunction(esIndex: String, esType: String) exten .source(json) } - override def process(element: Prediction, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { + override def process(element: T, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { indexer.add(createIndexRequest(element)) } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala index 09d529a..193ff10 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseOutputFormatAverageWithKey.scala @@ -48,7 +48,7 @@ class HBaseOutputFormatAverageWithKey extends OutputFormat[AverageWithKey] { @throws[IOException] override def writeRecord(record: AverageWithKey): Unit = { - val startTime = System.currentTimeMillis(); + val startTime = System.currentTimeMillis() // Make sure that the rowkey is sorted by the average values val put = new Put(Bytes.toBytes(taskNumber + rowNumber) ++ record.bytesRowKey()) put.setDurability(Durability.SKIP_WAL) diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseSink.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseSink.scala deleted file mode 100644 index 920f0aa..0000000 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/HBaseSink.scala +++ /dev/null @@ -1,7 +0,0 @@ -package com.bhaskardivya.projects.smartgrid.sinks - -class HBaseSink[IN] { - def getSink(): Unit ={ - - } -} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSink.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSink.scala deleted file mode 100644 index 75f807b..0000000 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/PredictionElasticSearchSink.scala +++ /dev/null @@ -1,33 +0,0 @@ -package com.bhaskardivya.projects.smartgrid.sinks - -import java.net.{InetAddress, InetSocketAddress} - -import com.bhaskardivya.projects.smartgrid.model.Prediction -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink - -object PredictionElasticSearchSink { - - def apply(params: ParameterTool, esIndex: String, esIndexType: String): ElasticsearchSink[Prediction] ={ - //Initialize Elastic search configuration - val esClusterLocationIP = params.get("es.cluster.ip", "192.168.99.100") - val esClusterLocationPort = params.getInt("es.cluster.port", 9300) - val config = new java.util.HashMap[String, String] - config.put("cluster.name", params.get("es.cluster.name", "docker-cluster")) - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put("bulk.flush.max.actions", "1") - - val transportAddresses = new java.util.ArrayList[InetSocketAddress] -/* - transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), esClusterLocationPort)) - transportAddresses.add(new InetSocketAddress(InetAddress.getByName("172.17.0.2"), esClusterLocationPort)) - transportAddresses.add(new InetSocketAddress(InetAddress.getByName("0.0.0.0"), esClusterLocationPort)) - transportAddresses.add(new InetSocketAddress(InetAddress.getByName("localhost"), esClusterLocationPort)) -*/ - transportAddresses.add(new InetSocketAddress(InetAddress.getByName("192.168.99.100"), esClusterLocationPort)) - transportAddresses.add(new InetSocketAddress(InetAddress.getByName(esClusterLocationIP), esClusterLocationPort)) - - new ElasticsearchSink(config, transportAddresses, new PredictionElasticSearchSinkFunction(esIndex, esIndexType)) - } - -} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSinkFunction.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSinkFunction.scala deleted file mode 100644 index aca041c..0000000 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sinks/SensorEventElasticSearchSinkFunction.scala +++ /dev/null @@ -1,29 +0,0 @@ -package com.bhaskardivya.projects.smartgrid.sinks - -import com.bhaskardivya.projects.smartgrid.model.SensorEvent -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} -import org.elasticsearch.action.ActionRequest -import org.elasticsearch.client.Requests - -/** - * Sink to ElasticSearch to store the prediction values - * @param esIndex ElasticSearch Index name - * @param esType ElasticSearch Index type - */ -class SensorEventElasticSearchSinkFunction(esIndex: String, esType: String) extends ElasticsearchSinkFunction[SensorEvent]{ - - def createIndexRequest(element: SensorEvent): ActionRequest = { - val json = element.toJSONString() - - Requests.indexRequest - .index(esIndex) - .`type`(esType) - .source(json) - } - - override def process(element: SensorEvent, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { - indexer.add(createIndexRequest(element)) - } - -} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala index 2938a47..e3c4140 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/FileSource.scala @@ -1,18 +1,32 @@ package com.bhaskardivya.projects.smartgrid.sources -import org.apache.flink.api.java.io.TextInputFormat +import com.bhaskardivya.projects.smartgrid.model.SensorEvent import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.core.fs.Path -import org.apache.flink.streaming.api.functions.source.FileProcessingMode import org.apache.flink.streaming.api.scala._ class FileSource { - def getSource(env: StreamExecutionEnvironment, params: ParameterTool): DataStream[String] = { - val filePath: String = params.get("filePath", "/run/media/osboxes/Data/data/sorted100M.1s.hsum.csv") - val fileInterval: Long = params.getLong("fileInterval", 100) + def getSimulatedCSVSource(env: StreamExecutionEnvironment, params: ParameterTool): DataStream[SensorEvent] = { + val data = params.get("input", "/data/data.gz") + val maxServingDelay = params.getInt("maxServingDelay", 0) + val servingSpeedFactor = params.getFloat("servingSpeedFactor", 1f) + val offsetEventTimestamp = params.has("offsetEventTimestamp") - env.readFile(new TextInputFormat(new Path(filePath)), filePath, FileProcessingMode.PROCESS_ONCE, fileInterval) + val events = env.addSource(new CSVFileSource(data, maxServingDelay, servingSpeedFactor, offsetEventTimestamp)) + .name("CSV GZ File") + + events + } + + def getSource(env: StreamExecutionEnvironment, params: ParameterTool): DataStream[SensorEvent] = { + val filePath: String = params.get("input", "/data/data.gz") + + // read the CSV GZ and assign Timestamp + val csv: DataStream[SensorEvent] = env.readTextFile(filePath) + .map[SensorEvent](line => SensorEvent.fromString(line)) + .assignTimestampsAndWatermarks(SensorEvent.tsAssigner()) + + csv } } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala index 2e90489..da25d4a 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/HBaseMedianSource.scala @@ -28,23 +28,23 @@ object HBaseMedianSource extends Serializable{ val scan = new Scan() if (avg != null && avg.key != null) { println("HBaseMedianSource | getMedian | ColumnQualifier is set") - scan.addColumn(columnFamily.getBytes(), avg.key.toColumnString().getBytes()) + scan.addColumn(columnFamily.getBytes(), avg.key.toColumnString.getBytes) } else { scan.addFamily(columnFamily.getBytes()) } - println("HBaseMedianSource | getMedian | Fetching Median " + table + " | " + columnFamily + " | " + avg.key.toColumnString()) + println("HBaseMedianSource | getMedian | Fetching Median " + table + " | " + columnFamily + " | " + avg.key.toColumnString) //println("BD | " + conf.toString) try { val aggregationClient: AggregationClient = new AggregationClient(conf) val median = aggregationClient.median(TableName.valueOf(table), new DoubleColumnInterpreter(), scan) - println("HBaseMedianSource | getMedian | Median Fetched " + table + " | " + columnFamily + " | " + avg.key.toColumnString()) + println("HBaseMedianSource | getMedian | Median Fetched " + table + " | " + columnFamily + " | " + avg.key.toColumnString) if (median == null) return 0.0 median / 1000.0 }catch { case e: Exception => println("Exception fetching median" + e) }finally { - println("GetMedian Took " + (System.currentTimeMillis() - startTime) + "ms for " + avg.key.toColumnString()) + println("GetMedian Took " + (System.currentTimeMillis() - startTime) + "ms for " + avg.key.toColumnString) } 0.0 } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala index 8daf230..ee2e9c4 100644 --- a/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/sources/KafkaSource.scala @@ -19,6 +19,7 @@ class KafkaSource() { val topic = params.get("topic", "test") val server = params.get("bootstrap.servers", "localhost:9092") val groupId = params.get("group.id", "test-" + System.currentTimeMillis().toString) + val startFromEarliest = params.getBoolean("start-from-earliest", true) // Create properties map for Kafka val properties = new Properties() @@ -26,9 +27,15 @@ class KafkaSource() { properties.setProperty("group.id", groupId) val consumer = new FlinkKafkaConsumer011[SensorEvent](topic, SensorEvent.schema(env.getConfig), properties) - consumer.setStartFromEarliest() - LOG.info("Created Source from kafka - Topic: {}, Server: {}, Consumer Group: {}", topic, server, groupId); + // Always start from the earliest kafka offset + if(startFromEarliest) + consumer.setStartFromEarliest() + + // Assign the Timestamp and Watermark from the SensorEvent's timestamp field + consumer.assignTimestampsAndWatermarks(SensorEvent.tsAssigner()) + + LOG.info("Created Source from kafka - Topic: {}, Server: {}, Consumer Group: {}", topic, server, groupId) env.addSource(consumer) } diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/util/JSONTrait.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/JSONTrait.scala new file mode 100644 index 0000000..1e09279 --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/JSONTrait.scala @@ -0,0 +1,7 @@ +package com.bhaskardivya.projects.smartgrid.util + +trait JSONTrait { + + def toJSONString() : String + +} diff --git a/src/main/scala/com/bhaskardivya/projects/smartgrid/util/TDigestMedian.scala b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/TDigestMedian.scala new file mode 100644 index 0000000..2e09de1 --- /dev/null +++ b/src/main/scala/com/bhaskardivya/projects/smartgrid/util/TDigestMedian.scala @@ -0,0 +1,21 @@ +package com.bhaskardivya.projects.smartgrid.util + +import com.tdunning.math.stats.TDigest +import java.io.Serializable + +@SerialVersionUID(1L) +class TDigestMedian() extends Serializable { + private var totalDigest: TDigest = _ + + this.setTotalDigest(TDigest.createDigest(100)) + + def getTotalDigest: TDigest = totalDigest + + def setTotalDigest(totalDigest: TDigest) { + this.totalDigest = totalDigest + } + + def addDigest(digest: Double): Unit = this.totalDigest.add(digest) + + def getMedian: Double = this.totalDigest.quantile(0.5) +} \ No newline at end of file