Examples to start using spark with Java Note: Spark runs on Java 8, make sure to have it installed on your system
In the various sub-packages you can find the exercises to start using spark with the java APIs
In this package you can find the solutions to the exercises.
The spark-submit script is used to launch applications on a cluster.
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
The most common options are:
- --class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
- --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
- --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
- --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. application-arguments: Arguments passed to the main method of your main class, if any.
Instructions:
- Install spark 2.4.4
- download spark
- unzip it on your system
- Clone the repository
- Go to the pom folder and compile the project (
mvn package) - Copy the resources folder into the new target folder (the resources must be in the same folder of the jar)
cp -r resources/ target/resources/ - Go to to the spark home folder (the main folder created during the unzip operations)
- Run the
spark-submiton a class of your choice
./bin/spark-submit \
--class com.quantiaconsulting.sjqs.basics.codeBrowsing.SparkHelloWorld \
--master local[*] \
<absolute path of your jar>/sjqs-1.0.jar
Note: Make sure to use absolute path for <<your path>>
- If you need to add additional repository to the maven pom please refer to:
Focus on the complexity of the java code if compared to python:
words.mapToPair(s -> new Tuple2<>(s, 1))vs..map(lambda s: (s, 1))mapToPairvs.map new Tuple2<>(s, 1) vs (s, 1)
counts.collect() only works if the counts RDD is small enough to fit the driver memory
Focus on .option("header",true)
Focus on .option("inferSchema",true)
Instructions:
- Use the class code to start
- Read the file
/resources/2015_02_clickstream.tsvand assign it to a DataFrame named testDF with the right schema using what you have learned (src)
Data Schema:
- prev_id: integer
- curr_id: integer
- n: integer
- prev_title: string
- curr_title: string
- type: string
Parquet is a free and open-source column-oriented data storage format, it is a part of apache-hadoop ecosystem -> doc
A Parquet file already contains metadata with the data schema, the number of rows, etc.
Read a parquet file as Dataframe -> spark.read().parquet(<parquet file path>);
- CSV Ingestion
- No schema: all the fields is considered as
String - Infer schema: the correct data types are inferred by the system, but the infer operation costs a double scan of the file
- manually provided schema: correct data types and single scan
- No schema: all the fields is considered as
- Parquet Ingestion: Columnar format, no need for schema specification
Focus on:
- Filter
Dataset<Row> partialres = tempDF.where("requests >= 1000");
System.out.println(partialres.count() + " rows with requests >= 1000 ");
- Projection
Dataset<Row> partialres = tempDF.where("requests >= 1000").select("timestamp","requests");
- Order
Dataset<Row> partialres3 = partialres2.orderBy("requests");
Or using the col static class
import static org.apache.spark.sql.functions.col;
Dataset<Row> partialres3 = partialres2.orderBy(col("requests").desc_nulls_last());
- Using tempview and SQL
tempDF.createOrReplaceTempView("richieste");
Dataset<Row> sqlRes = spark.sql("SELECT timestamp, requests FROM richieste WHERE requests >= 1000 ORDER BY
- Physical Plan
== Physical Plan ==
*(2) Sort [requests#2 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(requests#2 DESC NULLS LAST, 200)
+- *(1) Filter (isnotnull(requests#2) && (requests#2 >= 1000))
+- InMemoryTableScan [timestamp#0, requests#2], [isnotnull(requests#2), (requests#2 >= 1000)]
+- InMemoryRelation [timestamp#0, site#1, requests#2], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) FileScan parquet [timestamp#0,site#1,requests#2] Batched: true, Format: Parquet, Location: InMem
Note: Using the DataFrame API and SQL produce the same physical plan
Focus On:
- Add new columns: use
withColumn(...)-> doc - Transform a date/datetime in Unix Timestamp: use
unix_timestamp(...)-> doc
Focus On:
- More Functions! Extract year and month from a date/datetime: use
year(...)andmonth(...)-> doc
Focus On:
- Group and aggregate: use
groupBy(...)andagg(...)-> groupBy-doc, agg-doc - Format date: use
date_format(...)-> doc, java date format doc
doc from slide 2.16 a slide 2.20
doc from slide 4.3 to slide 4.21
Focus On:
- Join DataFrames:
join(...)-> doc
Instructions:
- Use the class code to start
- Read the file
/resources/2015_02_clickstream.tsvand assign it to a DataFrame named testDF with the right schema using what you have learned (src) - Find the top 10 most used path with 2 jumps
Hints:
- Read the same file in two different dataframes (create the schema manually)
- Rename the columns of one of the two Dataframes (use
.withColumnRenamed(...)-> doc) - Join the two Dataframes using the
curr_idof the original Dataframes and theprev_idof the renamed one - sum the number of clicks of the two paths (the value in the column
nof the original Dataframe + the value in the columnnof the renamed Dataframe). Try to useexpr(...)-> doc - order desc by the sum
Data Schema:
- prev_id: integer
- curr_id: integer
- n: integer
- prev_title: string
- curr_title: string
- type: string
Focus on:
String[] stopwords = remover.getStopWords();
ArrayList<String> stopwordsList = new ArrayList<String>(Arrays.asList(stopwords));
stopwordsList.add("br");
String[] newStopWords = new String[stopwordsList.size()];
newStopWords = stopwordsList.toArray(newStopWords);
remover = new StopWordsRemover()
.setInputCol("tokens")
.setOutputCol("stopWordFree")
.setStopWords(newStopWords);
Instructions:
- Use the class code to start
- Read the file
/resources/pwJoin.parquetand assign it to a DataFrame. - Predict the number of free parking per hour:
- Use at least two models of your choice
- Show that you understand hyper-parameter tuning
Data Description: The parquet file contains data related to a parking in Como and the weather during a period between 01/08/2016 and 01/05/2017.
Data Schema:
- temporalId:long
- parkingId:integer
- parkingName:string
- freeParking:double
- description:string
- icon:string
- temperature:double
- month:integer
- doy:integer
- dom:integer
- dow:string
- hour:integer









