Skip to content

quantiaconsulting/spark-java-quick-start

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

spark-java-quick-start

Examples to start using spark with Java Note: Spark runs on Java 8, make sure to have it installed on your system

Project structure

Root Package (com.quantiaconsulting.sjqs)

In the various sub-packages you can find the exercises to start using spark with the java APIs

Solutions Package (com.quantiaconsulting.sjqs.solutions)

In this package you can find the solutions to the exercises.

Utils

Run the solutions

spark-submit

The spark-submit script is used to launch applications on a cluster.

doc

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

The most common options are:

  • --class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
  • --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
  • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. application-arguments: Arguments passed to the main method of your main class, if any.

Instructions:

  • Install spark 2.4.4
  • Clone the repository
  • Go to the pom folder and compile the project (mvn package)
  • Copy the resources folder into the new target folder (the resources must be in the same folder of the jar) cp -r resources/ target/resources/
  • Go to to the spark home folder (the main folder created during the unzip operations)
  • Run the spark-submit on a class of your choice
./bin/spark-submit \
  --class com.quantiaconsulting.sjqs.basics.codeBrowsing.SparkHelloWorld \
  --master local[*] \
  <absolute path of your jar>/sjqs-1.0.jar

Note: Make sure to use absolute path for <<your path>>

Maven additional repository

Spark Introduction

spark

First Application - Word Count

code

Java Code complexity

Focus on the complexity of the java code if compared to python:

  • words.mapToPair(s -> new Tuple2<>(s, 1)) vs. .map(lambda s: (s, 1))
  • mapToPair vs. map new Tuple2<>(s, 1) vs (s, 1)

Collect for RDD

doc

counts.collect() only works if the counts RDD is small enough to fit the driver memory

Data Ingestion

CSV

Without schema

code

Focus on .option("header",true)

Automatically infer schema

code

Focus on .option("inferSchema",true)

Manually specify schema

code

Challenge

Instructions:

  • Use the class code to start
  • Read the file /resources/2015_02_clickstream.tsv and assign it to a DataFrame named testDF with the right schema using what you have learned (src)

Data Schema:

  • prev_id: integer
  • curr_id: integer
  • n: integer
  • prev_title: string
  • curr_title: string
  • type: string

Parquet

Parquet is a free and open-source column-oriented data storage format, it is a part of apache-hadoop ecosystem -> doc

A Parquet file already contains metadata with the data schema, the number of rows, etc.

Read a parquet file as Dataframe -> spark.read().parquet(<parquet file path>);

Wrap-up

  • CSV Ingestion
    • No schema: all the fields is considered as String
    • Infer schema: the correct data types are inferred by the system, but the infer operation costs a double scan of the file
    • manually provided schema: correct data types and single scan
  • Parquet Ingestion: Columnar format, no need for schema specification

Data Exploration and Preparation

Basics

code

Focus on:

  • Filter
Dataset<Row> partialres = tempDF.where("requests >= 1000");
System.out.println(partialres.count() + " rows with requests >= 1000 ");

  • Projection
Dataset<Row> partialres = tempDF.where("requests >= 1000").select("timestamp","requests");
  • Order
Dataset<Row> partialres3 = partialres2.orderBy("requests");

Or using the col static class

import static org.apache.spark.sql.functions.col;
Dataset<Row> partialres3 = partialres2.orderBy(col("requests").desc_nulls_last());
  • Using tempview and SQL
tempDF.createOrReplaceTempView("richieste");
Dataset<Row> sqlRes = spark.sql("SELECT timestamp, requests FROM richieste WHERE requests >= 1000 ORDER BY
  • Physical Plan
== Physical Plan ==
*(2) Sort [requests#2 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(requests#2 DESC NULLS LAST, 200)
   +- *(1) Filter (isnotnull(requests#2) && (requests#2 >= 1000))
      +- InMemoryTableScan [timestamp#0, requests#2], [isnotnull(requests#2), (requests#2 >= 1000)]
            +- InMemoryRelation [timestamp#0, site#1, requests#2], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(1) FileScan parquet [timestamp#0,site#1,requests#2] Batched: true, Format: Parquet, Location: InMem

Note: Using the DataFrame API and SQL produce the same physical plan

Advanced - Columns and Unix Timestamp

code

Focus On:

  • Add new columns: use withColumn(...) -> doc
  • Transform a date/datetime in Unix Timestamp: use unix_timestamp(...) -> doc

Advanced - Date Functions

code

Focus On:

  • More Functions! Extract year and month from a date/datetime: use year(...) and month(...) -> doc

Advanced - Group and Date Format

code

Focus On:

Advanced - Join

Join Intro

code

doc from slide 2.16 a slide 2.20

doc from slide 4.3 to slide 4.21

Focus On:

  • Join DataFrames: join(...) -> doc

Challenge

Instructions:

  • Use the class code to start
  • Read the file /resources/2015_02_clickstream.tsv and assign it to a DataFrame named testDF with the right schema using what you have learned (src)
  • Find the top 10 most used path with 2 jumps

Hints:

  • Read the same file in two different dataframes (create the schema manually)
  • Rename the columns of one of the two Dataframes (use .withColumnRenamed(...) -> doc)
  • Join the two Dataframes using the curr_id of the original Dataframes and the prev_id of the renamed one
  • sum the number of clicks of the two paths (the value in the column n of the original Dataframe + the value in the column n of the renamed Dataframe). Try to use expr(...) -> doc
  • order desc by the sum

Data Schema:

  • prev_id: integer
  • curr_id: integer
  • n: integer
  • prev_title: string
  • curr_title: string
  • type: string

Machine Learning

ML intro

A case of text analytics that uses a Decision Tree as classifier

code

Focus on:

  • RegexTokenizer ->doc
  • StopWordsRemover -> doc
  • Binarizer -> doc
  • Add more stopwords:
String[] stopwords = remover.getStopWords();
ArrayList<String> stopwordsList = new ArrayList<String>(Arrays.asList(stopwords));
stopwordsList.add("br");
String[] newStopWords = new String[stopwordsList.size()];
newStopWords = stopwordsList.toArray(newStopWords);
remover = new StopWordsRemover()
    .setInputCol("tokens")
    .setOutputCol("stopWordFree")
    .setStopWords(newStopWords);
  • DecisionTreeClassifier ->doc
  • Max Depth -> doc

A case of predictive analytics using a Decision Tree or a Random Forest as a regressor

code

Challenge

Instructions:

  • Use the class code to start
  • Read the file /resources/pwJoin.parquet and assign it to a DataFrame.
  • Predict the number of free parking per hour:
    • Use at least two models of your choice
    • Show that you understand hyper-parameter tuning

Data Description: The parquet file contains data related to a parking in Como and the weather during a period between 01/08/2016 and 01/05/2017.

Data Schema:

  • temporalId:long
  • parkingId:integer
  • parkingName:string
  • freeParking:double
  • description:string
  • icon:string
  • temperature:double
  • month:integer
  • doy:integer
  • dom:integer
  • dow:string
  • hour:integer

About

Examples to start using spark with Java

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •  

Languages