Apache Spark & Scala
1. Apache Spark:
- Fast & general engine for large scale data processing
- Spark can run on a Hadoop cluster to spread out massive data analysis and machine learning tasks
- Distribute massive data processing across fleet of computers in parallel
- Spark is in most cases a better alternative to Hadoop’s MapReduce component, but Spark can run on top of a Hadoop cluster, taking advantage of its distributed file system and YARN cluster manager. Spark also has its own built-in cluster manager allowing you to use it outside of Hadoop, but it is not a complete replacement for Hadoop. Spark and Hadoop can co-exist.
1.1 Course Material
- Install Java, IntelliJ, Scala Plugin, and course materials
- Install JDK 14 from Oracle Archive
- Install IntelliJ
- Add environment variable if using Windows OS
- Variable : HADOOP_HOME = c:\hadoop
- Add to path : %HADOOP_HOME%\bin
- https://www.sundog-education.com/sparkscala/
1.2 Spark Architecture Notes
- Driver Program - Spark Context
- Cluster Manager - Inbuilt Spark Cluster Manager or Hadoop’s YARN Cluster Manager (orchestrate the cluster of computers)
- How to spin up the resources you need
- How to distribute the work
- Where to put the different jobs in the most optimal place
- Where the data is most accessible
- Multiple Executor - Cache, Tasks
- Spark can run on top of Hadoop but doesn’t have to
- Individual nodes or executors - has its own Cache, has its own Task
- 10 to 100 times faster than Hadoop Map Reduce
- Is fast due to DAG (Directed Acyclic Graph) Engine which optimizes workflows
- More flexible and in-memory than MapReduce
- Mature technology & widely adopted
- Spark SQL on Datasets and DataFrames (higher level API)
- Resilient Distributed Dataset (original & lower level API)
- Components:
- Spark Core (lower level API)
- Spark Streaming (higher level API) - Real time monitoring & data analysis
- Spark SQL (higher level API) - Structured data
- MLlib (higher level API) - Distributed ML but somewhat limited
- GraphX (higher level API) - Social network analysis but not kind of well maintained
- Spark Core (lower level API)
1.3 Why Scala?
- Spark itself is written in Scala
- Scala’s functional programming model is a good fit for distributed processing
- Enorces you write code in a way such that functions can be easily distrbuted across entire cluster
- More likely to be easily parallelized
- Gives you fast performance (Scala compiles to Java bytecode)
- Less code & boilerplate stuff than Java
- Python is slower in comparison
- Not as slow as it used to be
But
- You probably don’t know Scala
- So let’s learn the basics
Python Equivalent
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x * x).collect()
Scala Equivalent
val nums = sc.parallelize(List(1,2,3,4))
val squared = nums.map(x => x * x).collect()
2. Scala Basics
- Yet another programming language
- Uniquely suited for Spark because its structured in way that lends itself to distributed processing over a cluster
- Faster & reliable than Python
- It’s what Spark is built with:
- New features are often Scala-first
- You can also program Spark with Python or Java
- Runs on top of the Java Virtual Machine (JVM)
- Can acess Java classes
- Compiles down to Java byte code
- Functional programming
2.1 Decalaring Immutable Values & Mutable Variables
// VALUES are immutable constants.
// Immutable - An object whose state cannot be changed after it has been defined
// Defined using one line or one atomic operation
val hello: String = "Hola!"
// Stick to immutable constant - avoid thread safety issue because of multiple threads trying to change the value at the same time
// VARIABLES are mutable
// Mutable - An object whose state can be changed after it has been defined
var helloThere: String = hello
helloThere = hello + " There!"
println(helloThere)
val immutableHelloThere = hello + " There"
println(immutableHelloThere)
// Data Types
val numberOne: Int = 1
val truth: Boolean = true
val letterA: Char = 'a'
val pi: Double = 3.14159265
val piSinglePrecision: Float = 3.14159265f
val bigNumber: Long = 123456789
val smallNumber: Byte = 127
println("Here is a mess: " + numberOne + truth + letterA + pi + bigNumber)
println(f"Pi is about $piSinglePrecision%.3f")
println(f"Zero padding on the left: $numberOne%05d")
println(s"I can use the s prefix to use variables like $numberOne $truth $letterA")
println(s"The s prefix isn't limited to variables; I can include any expression. Like ${1+2}")
val theUltimateAnswer: String = "To life, the universe, and everything is 42."
val pattern = """.* ([\d]+).*""".r
val pattern(answerString) = theUltimateAnswer
val answer = answerString.toInt
println(answer)
// Booleans
val isGreater = 1 > 2
val isLesser = 1 < 2
val impossible = isGreater & isLesser // bitwise and
val impossible = isGreater && isLesser // logical and
val anotherWay = isGreater || isLesser
val picard: String = "Picard"
val bestCaptain: String = "Picard"
val isBest: Boolean = picard == bestCaptain
2.2 Flow Control in Scala
// Flow control
// If / else:
if (1 > 3) println("Impossible!") else println("The world makes sense.")
if (1 > 3) {
println("Impossible!")
println("Really?")
} else {
println("The world makes sense.")
println("still.")
}
// Matching
val number = 2
number match {
case 1 => println("One")
case 2 => println("Two")
case 3 => println("Three")
case _ => println("Something else")
}
for (x <- 1 to 4) {
val squared = x * x
println(squared)
}
var x = 10
while (x >= 0) {
println(x)
x -= 1
}
x = 0
do { println(x); x+=1 } while (x <= 10)
// Expressions
{val x = 10; x + 20}
println({val x = 10; x + 20})
2.3 Functions in Scala
// Functions
// format def <function name>(parameter name: type...) : return type = { }
def squareIt(x: Int) : Int = {
x * x
}
def cubeIt(x : Int) : Int = {x * x * x}
println(squareIt(2))
println(cubeIt(3))
// Function which takes an integer and another function as parameter; then returns an integer
// Important concept in functional programming
def transformInt(x: Int, f: Int => Int): Int = {
f(x)
}
val result = transformInt(2, cubeIt)
println(result)
// Lambda Function, Anonymous Inline Function, Function Literal
transformInt(3, x => x * x * x)
transformInt(10, x => x / 2)
transformInt(2, x => {val y = x * 2; y * y})
2.4 Data Structures in Scala
// Data structures
// Tuples
// Immutable lists
val captainStuff = ("Picard", "Enterprise-D", "NCC-1701-D")
println(captainStuff)
// Refer to the individual fields with a ONE-BASED index
println(captainStuff._1)
println(captainStuff._2)
println(captainStuff._3)
// Create key-value pair
val picardsShip = "Picard" -> "Enterprise-D"
println(picardsShip._2)
// Tuples can hold items of different types
val aBunchOfStuff = ("Kirk", 1964, true)
// Lists
// Like a tuple, but more functionality
// Must be of same type
val shipList = List("Enterprise", "Defiant", "Voyager", "Deep Space Nine")
println(shipList(1))
// Unlike Tuples which have one-based indexing, Lists have zero-based indexing
println(shipList.head) // First item in list
println(shipList.tail) // All items excluding the first
for (ship <- shipList) {println(ship)}
// Map is very important in the world of functional programming
val backwardShips = shipList.map( (ship: String) => {ship.reverse})
for (ship <- backwardShips) {println(ship)}
// reduce() to combine together all the items in a collection using some function
val numberList = List(1, 2, 3, 4, 5)
val sum = numberList.reduce( (x: Int, y: Int) => x + y)
println(sum)
// returns 15
// filter() removes stuff
val iHateFives = numberList.filter( (x: Int) => x != 5)
// shorthand expression
val iHateThrees = numberList.filter(_ != 3)
// Concatenate lists
val moreNumbers = List(6,7,8)
val lotsOfNumbers = numberList ++ moreNumbers
val reversed = numberList.reverse
val sorted = reversed.sorted
val lotsOfDuplicates = numberList ++ numberList
val distinctValues = lotsOfDuplicates.distinct
val maxValue = numberList.max
val total = numberList.sum
val hasThree = iHateThrees.contains(3)
// MAPS (in other languages known as : dictionaries or key-value lookups)
val shipMap = Map("Kirk" -> "Enterprise", "Picard" -> "Enterprise-D", "Sisko" -> "Deep Space Nine", "Janeway" -> "Voyager")
println(shipMap("Janeway"))
println(shipMap.contains("Archer"))
// Exception handling
val archersShip = util.Try(shipMap("Archer")) getOrElse "Unknown"
println(archersShip)
3. Using Resilient Distributed Datasets
3.1 RDD - Original & Old API (Spark Roots)
- Resilient
- Distributed
- Dataset
3.1.1 Spark Context
- Created by your driver program
- Is responsible for making RDD’s resilient and distributed
- The Spark shell creates a “sc” object for you
val nums = parallelize(List(1,2,3,4))
sc.textfile("file:///c:/users/frank/gobs-o-text.txt")
s3n:// or hdfs://
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
// Can also create RDD from JDBC, NoSQL Databases (Cassandra, HBase), Elasticsearch, JSON, .csv, various compressed datasets
3.1.2 Transforming RDD
- map
- flatmap
- filter
- distinct
- sample
- union, intersection, subtract, cartesian
val rdd = sc.parallelize(List(1,2,3,4))
// Functional Programming : many RDD methods accept functions
// Functional Programming : passing functions around
// Scala forcing you to write functions that can be distributed
val squares = rdd.map(x => x*x)
val squares = rdd.map(squareIt)
// This yields 1, 4, 9, 16
3.1.3 Actions on RDD
- An action is something that causes your RDD to collapse and gives a result back to driver script
- collect
- count
- countByValue
- take
- top
- reduce
- Lazy Evaluation - Nothing actually happens in your driver program until an action is called
3.1.4 Ratings histogram example
// Import what you need
package com.sundogsoftware.spark
// Import the packages that we need
import org.apache.spark._ // Everything from Spark
import org.apache.log4j._ // Everything from Log4j; Adjust our error level and logging
// Structure every driver script needs to conform to
/** Count up how many of each star rating exists in the MovieLens 100K data set. */
object RatingsCounter {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine, named RatingsCounter
// The * means Spark will atleast parallelize across different CPU cores
val sc = new SparkContext("local[*]", "RatingsCounter")
// Load up each line of the ratings data into an RDD
val lines = sc.textFile("data/ml-100k/u.data")
// Convert each line to a string, split it out by tabs, and extract the third field.
// (The file format is userID, movieID, rating, timestamp)
val ratings = lines.map(x => x.split("\t")(2))
// Count up how many times each value (rating) occurs
// Calling an action
val results = ratings.countByValue()
// Sort the resulting map of (rating, count) tuples
val sortedResults = results.toSeq.sortBy(_._1)
// Print each result on its own line.
sortedResults.foreach(println)
}
}
// Returns :
// (1,6110)
// (2,11370)
// (3,27145)
// (4,34174)
// (5,21201)
3.1.5 Spark Internals
- Spark created an execution plan using DAG when the action is called
- Job is broken into Stages (based on when data needs to be re-organized), Stages broken into Tasks
- Finally the tasks are scheduled across your cluster and executed
3.1.6 Special RDDs : Key-Value RDDs
- For example: number of friends by age
- Key is age, Value is number of friends
- Instead of just a list of ages or a list of # of friends, we can store them as key-value RDDs
- Spark can do special stuff with key-value data
- You can also do SQL-style joins (can do it but isn’t done in modern spark)
- Mapping just the values of a key-value RDD
3.1.7 Example 1 : Average friends by Age
// key: x | value: 1
totalsByAge = rdd.map(x => (x,1))
// Spark can do special stuff with key-value data
rdd.reduceByKey(x,y) => x + y) : combine values with the same key using some function. rdd.reduceByKey( (x, y) => x + y)
groupByKey() : Group values with the same key
sortByKey() : Sort RDD by key values
keys(), values() : Create an RDD of just the keys, or just the values
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
/** Compute the average number of friends by age in a social network. */
object FriendsByAge {
/** A function that splits a line of input into (age, numFriends) tuples. */
def parseLine(line: String): (Int, Int) = {
// Split by commas
val fields = line.split(",")
// Extract the age and numFriends fields, and convert to integers
val age = fields(2).toInt
val numFriends = fields(3).toInt
// Create a tuple that is our result.
(age, numFriends)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "FriendsByAge")
// Load each line of the source data into an RDD
val lines = sc.textFile("data/fakefriends-noheader.csv")
// Use our parseLines function to convert to (age, numFriends) tuples
val rdd = lines.map(parseLine)
// Returns Tuples:
// 33, 385
// 33, 2
// 55, 221
// 40, 465
// Lots going on here...
// We are starting with an RDD of form (age, numFriends) where age is the KEY and numFriends is the VALUE
// We use mapValues to convert each numFriends value to a tuple of (numFriends, 1)
// Then we use reduceByKey to sum up the total numFriends and total instances for each age, by
// adding together all the numFriends values and 1's respectively.
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
// rdd.mapValues(x => (x, 1))
// Returns:
// (33, (385, 1))
// (33, (2, 1))
// (55, (221, 1))
// (40, (465, 1))
// reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
// Returns:
// (33, (387, 2))
// (55, (221, 1))
// (40, (465, 1))
// So now we have tuples of (age, (totalFriends, totalInstances))
// To compute the average we divide totalFriends / totalInstances for each age.
val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
// Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
val results = averagesByAge.collect()
// Returns:
// (33, 193.5)
// (55, 221)
// (40, 465)
// Sort and print the final results.
results.sorted.foreach(println)
}
}
3.1.8 Example 2 : Minimum Temperature by Location
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
import scala.math.min
/** Find the minimum temperature by weather station */
object MinTemperatures {
def parseLine(line:String): (String, String, Float) = {
val fields = line.split(regex = ",")
val stationID = fields(0)
val entryType = fields(2)
val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
(stationID, entryType, temperature)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext(master = "local[*]", appName = "MinTemperatures")
// Read each line of input data
val lines = sc.textFile(path = "data/1800.csv")
// Convert to (stationID, entryType, temperature) tuples
val parsedLines = lines.map(parseLine)
// Filter out all but TMIN entries
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
// Convert to (stationID, temperature)
val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
// Reduce by stationID retaining the minimum temperature found
val minTempsByStation = stationTemps.reduceByKey( (x,y) => min(x,y))
// Collect, format, and print the results
val results = minTempsByStation.collect()
for (result <- results.sorted) {
val station = result._1
val temp = result._2
val formattedTemp = f"$temp%.2f F"
println(s"$station minimum temperature: $formattedTemp")
}
}
}
3.1.9 Example 3 : Counting Word Occurrences using Flatmap()
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
/** Count up how many of each word appears in a book as simply as possible. */
object WordCount {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "WordCount")
// Read each line of my book into an RDD
val input = sc.textFile("data/book.txt")
// Split into words separated by a space character
val words = input.flatMap(x => x.split(" "))
// Count up the occurrences of each word
val wordCounts = words.countByValue()
// Print the results.
wordCounts.foreach(println)
}
}
3.1.10 Example 4 : Improving the Word Count Script with Regular Expressions
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
/** Count up how many of each word occurs in a book, using regular expressions. */
object WordCountBetter {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "WordCountBetter")
// Load each line of my book into an RDD
val input = sc.textFile("data/book.txt")
// Split using a regular expression that extracts words
// Regular expression knows what a word is
val words = input.flatMap(x => x.split("\\W+"))
// Normalize everything to lowercase
val lowercaseWords = words.map(x => x.toLowerCase())
// Count of the occurrences of each word
val wordCounts = lowercaseWords.countByValue()
// Print the results
wordCounts.foreach(println)
}
3.1.11 Example 5 : Sorting the Word Count Results
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
/** Count up how many of each word occurs in a book, using regular expressions and sorting the final results */
object WordCountBetterSorted {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using the local machine
val sc = new SparkContext("local", "WordCountBetterSorted")
// Load each line of my book into an RDD
val input = sc.textFile("data/book.txt")
// Split using a regular expression that extracts words
val words = input.flatMap(x => x.split("\\W+"))
// Normalize everything to lowercase
val lowercaseWords = words.map(x => x.toLowerCase())
// Count of the occurrences of each word
val wordCounts = lowercaseWords.map(x => (x, 1)).reduceByKey( (x,y) => x + y )
// Flip (word, count) tuples to (count, word) and then sort by key (the counts)
val wordCountsSorted = wordCounts.map( x => (x._2, x._1) ).sortByKey()
// Print the results, flipping the (count, word) results to word: count as we go.
for (result <- wordCountsSorted) {
val count = result._1
val word = result._2
println(s"$word: $count")
}
}
}
3.1.12 Example 6 : Total Amount Spent by Customer
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomer {
/** Convert input data to (customerID, amountSpent) tuples */
def extractCustomerPricePairs(line: String): (Int, Float) = {
val fields = line.split(",")
(fields(0).toInt, fields(2).toFloat)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "TotalSpentByCustomer")
val input = sc.textFile("data/customer-orders.csv")
val mappedInput = input.map(extractCustomerPricePairs)
val totalByCustomer = mappedInput.reduceByKey( (x,y) => x + y )
val results = totalByCustomer.collect()
// Print the results.
results.foreach(println)
}
}
3.1.13 Example 7 : Total Amount Spent by Customer
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomerSorted {
/** Convert input data to (customerID, amountSpent) tuples */
def extractCustomerPricePairs(line: String): (Int, Float) = {
val fields = line.split(",")
(fields(0).toInt, fields(2).toFloat)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "TotalSpentByCustomerSorted")
val input = sc.textFile("data/customer-orders.csv")
val mappedInput = input.map(extractCustomerPricePairs)
val totalByCustomer = mappedInput.reduceByKey( (x,y) => x + y )
val flipped = totalByCustomer.map( x => (x._2, x._1) )
val totalByCustomerSorted = flipped.sortByKey()
val results = totalByCustomerSorted.collect()
// Print the results.
results.foreach(println)
}
}
4. SparkSQL, DataFrames, and Datasets
4.1 Intro to SparkSQL
- Modern API for Spark (layer above Spark core)
- DataFrames and DataSets
4.2 DataFrames
- Contain Row objects
- Can run SQL queries (since has a schema)
- Has a schema
- Communicates with JDBC & Tableau
- Can also use SQL optimization
4.3 DataSets
- DataSets can explicitly wrap a given struct or type
- It knows what it columns are from the get-go
- DataFrames schema is inferred at runtime; but a DataSet can be inferred at compile time
- Faster detection of errors and better optimization
- Datasets can only be. used in compiled languages (Java, Scala) but not interpreted language (Python)
- Yet another reason to use Scala
- RDD’s can be converted to DataSets with .toDS()
- RDDs are still useful for some operations
- The trend in Spark is to use RDD’s less and DataSets more
- DataSets are more efficient
- They can be serialized very efficiently - even better than Kyro
- Optimal execution plans can be determined at compile time
- DataSets allow for better interoperability
- MLLib and Spark Streaming are moving toward using dataSets instead of RDDs for their primary API
- DataSets simplify development
- You can perform most SQL operations on a dataset with one line
- DataSets are the new hotness
- Create a SparkSession object instead of a SparkContext when using Spark SQL / DataSets
- Spark SQL exposes a JDBC/ODBC server (if you built Spark with Hive support)
- Start it with sbin/start-thriftserver.sh
- Connect using bee/beeline -u jdbc:hive2://localhost:10000
- Viola, you have a SQL shell to Spark SQL
- You can create new tables, or query existing ones that were cached using hiveCtx.cacheTable(“tableName”)
// Other stuff you can do with datasets
myResultDataset.show()
myResultDataset.select("someFieldName")
myResultDataset.filter(myResultDataset("someFieldName") > 200)
myResultDataset.groupBy(myResultDataset("someFieldName")).mean()
myResultDataset.rdd().map(mapperFunction)
- You can create user-defined functions (UDFs)
import org.apache.spark.sql.functions.udf
val square = udf{(x=>x*x)}
squaredDF = df.withColumn("square", square('value'))
4.4 Let’s play with SparkSQL, DataFrames and DataSets : Fake Friends Data
Example 1 : Fake Friend’s data and Spark SQL API
- Case Class - compact way of defining an object
- Contructs virtual DB and schema for the data
- For datasets we cant infer schema and needs to be defined at compile time
package com.sundogsoftware.spark
import org.apache.spark.sql._
import org.apache.log4j._
object SparkSQLDataset {
// Case Class - compact way of defining an object
case class Person(id:Int, name:String, age:Int, friends:Int)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use SparkSession interface
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.getOrCreate()
// Load each line of the source data into an Dataset
import spark.implicits._
val schemaPeople = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/fakefriends.csv")
// till here a DataFrame but converting into a DataSet by giving a schema at compile time
// otherwise will be inferred at RunTime
.as[Person]
schemaPeople.printSchema()
schemaPeople.createOrReplaceTempView("people")
val teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
val results = teenagers.collect()
results.foreach(println)
spark.stop()
}
}
Example 2 : DataFrames & DataSet
package com.sundogsoftware.spark
import org.apache.spark.sql._
import org.apache.log4j._
object DataFramesDataset {
case class Person(id:Int, name:String, age:Int, friends:Int)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.getOrCreate()
// Convert our csv file to a DataSet, using our Person case
// class to infer the schema.
import spark.implicits._
val people = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/fakefriends.csv")
.as[Person]
// There are lots of other ways to make a DataFrame.
// For example, spark.read.json("json file path")
// or sqlContext.table("Hive table name")
println("Here is our inferred schema:")
people.printSchema()
println("Let's select the name column:")
people.select("name").show()
println("Filter out anyone over 21:")
people.filter(people("age") < 21).show()
println("Group by age:")
people.groupBy("age").count().show()
println("Make everyone 10 years older:")
people.select(people("name"), people("age") + 10).show()
spark.stop()
}
}
Example 3 : Friend’s by Age with Spark DataSets
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/** Compute the average number of friends by age in a social network. */
object FriendsByAgeDataset {
// Create case class with schema of fakefriends.csv
case class FakeFriends(id: Int, name: String, age: Int, friends: Long)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("FriendsByAge")
.master("local[*]")
.getOrCreate()
// Load each line of the source data into an Dataset
import spark.implicits._
val ds = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/fakefriends.csv")
.as[FakeFriends]
// Select only age and numFriends columns
val friendsByAge = ds.select("age", "friends")
// From friendsByAge we group by "age" and then compute average
friendsByAge.groupBy("age").avg("friends").show()
// Sorted:
friendsByAge.groupBy("age").avg("friends").sort("age").show()
// Formatted more nicely:
friendsByAge.groupBy("age").agg(round(avg("friends"), 2))
.sort("age").show()
// With a custom column name:
friendsByAge.groupBy("age").agg(round(avg("friends"), 2)
.alias("friends_avg")).sort("age").show()
}
}
Example 4 : Word count by regex, sorting and DataSets
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/** Count up how many of each word occurs in a book, using regular expressions and sorting the final results */
object WordCountBetterSortedDataset {
case class Book(value: String)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("WordCount")
.master("local[*]")
.getOrCreate()
// Read each line of my book into an Dataset
import spark.implicits._
// Implicitly infer schema (with no header, so column name is just "value")
// and then convert DataFrame to DataSet (provide a scehma for compile time)
val input = spark.read.text("data/book.txt").as[Book]
// Split using a regular expression that extracts words
val words = input
.select(explode(split($"value", "\\W+")).alias("word"))
.filter($"word" =!= "")
// Normalize everything to lowercase
val lowercaseWords = words.select(lower($"word").alias("word"))
// Count up the occurrences of each word
val wordCounts = lowercaseWords.groupBy("word").count()
// Sort by counts
val wordCountsSorted = wordCounts.sort("count")
// Show the results.
wordCountsSorted.show(wordCountsSorted.count.toInt)
// ANOTHER WAY TO DO IT (Blending RDD's and Datasets)
val bookRDD = spark.sparkContext.textFile("data/book.txt")
val wordsRDD = bookRDD.flatMap(x => x.split("\\W+"))
val wordsDS = wordsRDD.toDS()
val lowercaseWordsDS = wordsDS.select(lower($"value").alias("word"))
val wordCountsDS = lowercaseWordsDS.groupBy("word").count()
val wordCountsSortedDS = wordCountsDS.sort("count")
wordCountsSortedDS.show(wordCountsSortedDS.count.toInt)
}
}
Example 5 : Revisiting the Minimum Temperature
package com.sundogsoftware.spark
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructType}
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/** Find the minimum temperature by weather station */
object MinTemperaturesDataset {
case class Temperature(stationID: String, date: Int, measure_type: String, temperature: Float)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("MinTemperatures")
.master("local[*]")
.getOrCreate()
val temperatureSchema = new StructType()
.add("stationID", StringType, nullable = true)
.add("date", IntegerType, nullable = true)
.add("measure_type", StringType, nullable = true)
.add("temperature", FloatType, nullable = true)
// Read the file as dataset
import spark.implicits._
val ds = spark.read
.schema(temperatureSchema)
.csv("data/1800.csv")
.as[Temperature]
// Filter out all but TMIN entries
val minTemps = ds.filter($"measure_type" === "TMIN")
// Select only stationID and temperature)
val stationTemps = minTemps.select("stationID", "temperature")
// Aggregate to find minimum temperature for every station
val minTempsByStation = stationTemps.groupBy("stationID").min("temperature")
// Convert temperature to fahrenheit and sort the dataset
val minTempsByStationF = minTempsByStation
.withColumn("temperature", round($"min(temperature)" * 0.1f * (9.0f / 5.0f) + 32.0f, 2))
.select("stationID", "temperature").sort("temperature")
// Collect, format, and print the results
val results = minTempsByStationF.collect()
for (result <- results) {
val station = result(0)
val temp = result(1).asInstanceOf[Float]
val formattedTemp = f"$temp%.2f F"
println(s"$station minimum temperature: $formattedTemp")
}
}
}
Example 6 : Total Spent by Customer
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{round, sum}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomerSortedDataset {
case class CustomerOrders(cust_id: Int, item_id: Int, amount_spent: Double)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("TotalSpentByCustomer")
.master("local[*]")
.getOrCreate()
// Create schema when reading customer-orders
val customerOrdersSchema = new StructType()
.add("cust_id", IntegerType,nullable = true)
.add("item_id", IntegerType,nullable = true)
.add("amount_spent", DoubleType,nullable = true)
// Load up the data into spark dataset
// Use default separator (,), load schema from customerOrdersSchema and force case class to read it as dataset
import spark.implicits._
val customerDS = spark.read
.schema(customerOrdersSchema)
.csv("data/customer-orders.csv")
.as[CustomerOrders]
val totalByCustomer = customerDS
.groupBy("cust_id")
.agg(round(sum("amount_spent"), 2)
.alias("total_spent"))
val totalByCustomerSorted = totalByCustomer.sort("total_spent")
totalByCustomerSorted.show(totalByCustomer.count.toInt)
}
}
5. Advanced Examples of Spark Programs
Example 1 : Find the Most Popular Movies
Movielens Data - userID, movieID, rating, timestamp
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
/** Find the movies with the most ratings. */
object PopularMoviesDataset {
// Case class so we can get a column name for our movie ID
final case class Movie(movieID: Int)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("PopularMovies")
.master("local[*]")
.getOrCreate()
// Create schema when reading u.data
val moviesSchema = new StructType()
.add("userID", IntegerType, nullable = true)
.add("movieID", IntegerType, nullable = true)
.add("rating", IntegerType, nullable = true)
.add("timestamp", LongType, nullable = true)
import spark.implicits._
// Load up movie data as dataset
val moviesDS = spark.read
.option("sep", "\t")
.schema(moviesSchema)
.csv("data/ml-100k/u.data")
.as[Movie]
// Some SQL-style magic to sort all movies by popularity in one line!
val topMovieIDs = moviesDS.groupBy("movieID").count().orderBy(desc("count"))
// Grab the top 10
topMovieIDs.show(10)
// Stop the session
spark.stop()
}
}
Example 2 : Use Broadcast Variables & Spark UDF to Display Movie Names
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
import scala.io.{Codec, Source}
/** Find the movies with the most ratings. */
object PopularMoviesNicerDataset {
case class Movies(userID: Int, movieID: Int, rating: Int, timestamp: Long)
/** Load up a Map of movie IDs to movie names. */
def loadMovieNames() : Map[Int, String] = {
// Handle character encoding issues:
implicit val codec: Codec = Codec("ISO-8859-1") // This is the current encoding of u.item, not UTF-8.
// Create a Map of Ints to Strings, and populate it from u.item.
var movieNames:Map[Int, String] = Map()
val lines = Source.fromFile("data/ml-100k/u.item")
for (line <- lines.getLines()) {
val fields = line.split('|')
if (fields.length > 1) {
movieNames += (fields(0).toInt -> fields(1))
}
}
lines.close()
movieNames
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("PopularMoviesNicer")
.master("local[*]")
.getOrCreate()
val nameDict = spark.sparkContext.broadcast(loadMovieNames())
// Create schema when reading u.data
val moviesSchema = new StructType()
.add("userID", IntegerType, nullable = true)
.add("movieID", IntegerType, nullable = true)
.add("rating", IntegerType, nullable = true)
.add("timestamp", LongType, nullable = true)
// Load up movie data as dataset
import spark.implicits._
val movies = spark.read
.option("sep", "\t")
.schema(moviesSchema)
.csv("data/ml-100k/u.data")
.as[Movies]
// Get number of reviews per movieID
val movieCounts = movies.groupBy("movieID").count()
// Create a user-defined function to look up movie names from our
// shared map variable.
// We start by declaring an "anonymous function" in Scala
val lookupName : Int => String = (movieID:Int)=>{
nameDict.value(movieID)
}
// Then wrap it with a udf
val lookupNameUDF = udf(lookupName)
// Add a movieTitle column using our new udf
val moviesWithNames = movieCounts.withColumn(colName="movieTitle", lookupNameUDF(col(colName="movieID")))
// Sort the results
val sortedMoviesWithNames = moviesWithNames.sort(sortCol="count")
// Show the results without truncating it
sortedMoviesWithNames.show(sortedMoviesWithNames.count.toInt, truncate = false)
}
}
+-------+-----+---------------------------------------------------------------------------------+
|movieID|count|movieTitle |
+-------+-----+---------------------------------------------------------------------------------+
|121 |429 |Independence Day (ID4) (1996) |
|300 |431 |Air Force One (1997) |
|1 |452 |Toy Story (1995) |
|288 |478 |Scream (1996) |
|286 |481 |English Patient, The (1996) |
|294 |485 |Liar Liar (1997) |
|181 |507 |Return of the Jedi (1983) |
|100 |508 |Fargo (1996) |
|258 |509 |Contact (1997) |
|50 |583 |Star Wars (1977) |
+-------+-----+---------------------------------------------------------------------------------+
Example 3 : Most Popular Superhero in a Social Graph
with DataSet
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
/** Find the superhero with the most co-appearances. */
object MostPopularSuperheroDataset {
case class SuperHeroNames(id: Int, name: String)
case class SuperHero(value: String)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("MostPopularSuperhero")
.master("local[*]")
.getOrCreate()
// Create schema when reading Marvel-names.txt
val superHeroNamesSchema = new StructType()
.add("id", IntegerType, nullable = true)
.add("name", StringType, nullable = true)
// Build up a hero ID -> name Dataset
import spark.implicits._
val names = spark.read
.schema(superHeroNamesSchema)
.option("sep", " ")
.csv("data/Marvel-names.txt")
.as[SuperHeroNames]
val lines = spark.read
.text("data/Marvel-graph.txt")
.as[SuperHero]
val connections = lines
.withColumn("id", split(col("value"), " ")(0))
.withColumn("connections", size(split(col("value"), " ")) - 1)
.groupBy("id").agg(sum("connections").alias("connections"))
val mostPopular = connections
.sort($"connections".desc)
.first()
val mostPopularName = names
.filter($"id" === mostPopular(0))
.select("name")
.first()
println(s"${mostPopularName(0)} is the most popular superhero with ${mostPopular(1)} co-appearances.")
}
}
CAPTAIN AMERICA is the most popular superhero with 1937 co-appearances.
with RDD
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
/** Find the superhero with the most co-appearances. */
object MostPopularSuperhero {
// Function to extract the hero ID and number of connections from each line
def countCoOccurrences(line: String): (Int, Int) = {
val elements = line.split("\\s+")
( elements(0).toInt, elements.length - 1 )
}
// Function to extract hero ID -> hero name tuples (or None in case of failure)
def parseNames(line: String) : Option[(Int, String)] = {
val fields = line.split('\"')
if (fields.length > 1) {
Some(fields(0).trim().toInt, fields(1))
} else None // flatmap will just discard None results, and extract data from Some results.
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MostPopularSuperhero")
// Build up a hero ID -> name RDD
val names = sc.textFile("data/marvel-names.txt")
val namesRdd = names.flatMap(parseNames)
// Load up the superhero co-appearance data
val lines = sc.textFile("data/marvel-graph.txt")
// Convert to (heroID, number of connections) RDD
val pairings = lines.map(countCoOccurrences)
// Combine entries that span more than one line
val totalFriendsByCharacter = pairings.reduceByKey( (x,y) => x + y )
// Flip it to # of connections, hero ID
val flipped = totalFriendsByCharacter.map( x => (x._2, x._1) )
// Find the max # of connections
val mostPopular = flipped.max()
// Look up the name (lookup returns an array of results, so we need to access the first result with (0)).
val mostPopularName = namesRdd.lookup(mostPopular._2).head
// Print out our answer!
println(s"$mostPopularName is the most popular superhero with ${mostPopular._1} co-appearances.")
}
}
Example 5 : Most Obscure Superhero in a Social Graph
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
/** Find the superhero with the most co-appearances. */
object MostPopularSuperheroDataset {
case class SuperHeroNames(id: Int, name: String)
case class SuperHero(value: String)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("MostPopularSuperhero")
.master("local[*]")
.getOrCreate()
// Create schema when reading Marvel-names.txt
val superHeroNamesSchema = new StructType()
.add("id", IntegerType, nullable = true)
.add("name", StringType, nullable = true)
// Build up a hero ID -> name Dataset
import spark.implicits._
val names = spark.read
.schema(superHeroNamesSchema)
.option("sep", " ")
.csv("data/Marvel-names.txt")
.as[SuperHeroNames]
val lines = spark.read
.text("data/Marvel-graph.txt")
.as[SuperHero]
val connections = lines
.withColumn("id", split(col("value"), " ")(0))
.withColumn("connections", size(split(col("value"), " ")) - 1)
.groupBy("id").agg(sum("connections").alias("connections"))
val minConnectionCount = connections.agg(min("connections")).first().getLong()
val minConnections = connections.filter($"connections" === minConnectionCount)
val minConnectionsWithNames = minConnections.join(names, usingColumn = "id")
println("The following characters have only " + minConnectionCount + " connection(s):")
minConnectionsWithNames.select("name").show()
}
}
Example 6 : Superhero Degrees of Separation: Introducing Breadth-First Search
Computer Science Algorithm : Introducing Breadth-First Search
with RDD
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
/** Finds the degrees of separation between two Marvel comic book characters, based
* on co-appearances in a comic.
*/
object DegreesOfSeparation {
// The characters we want to find the separation between.
val startCharacterID = 5306 //SpiderMan
val targetCharacterID = 14 //ADAM 3,031 (who?)
// We make our accumulator a "global" Option so we can reference it in a mapper later.
var hitCounter:Option[LongAccumulator] = None
// Some custom data types
// BFSData contains an array of hero ID connections, the distance, and color.
type BFSData = (Array[Int], Int, String)
// A BFSNode has a heroID and the BFSData associated with it.
type BFSNode = (Int, BFSData)
/** Converts a line of raw input into a BFSNode */
def convertToBFS(line: String): BFSNode = {
// Split up the line into fields
val fields = line.split("\\s+")
// Extract this hero ID from the first field
val heroID = fields(0).toInt
// Extract subsequent hero ID's into the connections array
var connections: ArrayBuffer[Int] = ArrayBuffer()
for ( connection <- 1 until (fields.length - 1)) {
connections += fields(connection).toInt
}
// Default distance and color is 9999 and white
var color:String = "WHITE"
var distance:Int = 9999
// Unless this is the character we're starting from
if (heroID == startCharacterID) {
color = "GRAY"
distance = 0
}
(heroID, (connections.toArray, distance, color))
}
/** Create "iteration 0" of our RDD of BFSNodes */
def createStartingRdd(sc:SparkContext): RDD[BFSNode] = {
val inputFile = sc.textFile("data/marvel-graph.txt")
inputFile.map(convertToBFS)
}
/** Expands a BFSNode into this node and its children */
def bfsMap(node:BFSNode): Array[BFSNode] = {
// Extract data from the BFSNode
val characterID:Int = node._1
val data:BFSData = node._2
val connections:Array[Int] = data._1
val distance:Int = data._2
var color:String = data._3
// This is called from flatMap, so we return an array
// of potentially many BFSNodes to add to our new RDD
var results:ArrayBuffer[BFSNode] = ArrayBuffer()
// Gray nodes are flagged for expansion, and create new
// gray nodes for each connection
if (color == "GRAY") {
for (connection <- connections) {
val newCharacterID = connection
val newDistance = distance + 1
val newColor = "GRAY"
// Have we stumbled across the character we're looking for?
// If so increment our accumulator so the driver script knows.
if (targetCharacterID == connection) {
if (hitCounter.isDefined) {
hitCounter.get.add(1)
}
}
// Create our new Gray node for this connection and add it to the results
val newEntry:BFSNode = (newCharacterID, (Array(), newDistance, newColor))
results += newEntry
}
// Color this node as black, indicating it has been processed already.
color = "BLACK"
}
// Add the original node back in, so its connections can get merged with
// the gray nodes in the reducer.
val thisEntry:BFSNode = (characterID, (connections, distance, color))
results += thisEntry
results.toArray
}
/** Combine nodes for the same heroID, preserving the shortest length and darkest color. */
def bfsReduce(data1:BFSData, data2:BFSData): BFSData = {
// Extract data that we are combining
val edges1:Array[Int] = data1._1
val edges2:Array[Int] = data2._1
val distance1:Int = data1._2
val distance2:Int = data2._2
val color1:String = data1._3
val color2:String = data2._3
// Default node values
var distance:Int = 9999
var color:String = "WHITE"
var edges:ArrayBuffer[Int] = ArrayBuffer()
// See if one is the original node with its connections.
// If so preserve them.
if (edges1.length > 0) {
edges ++= edges1
}
if (edges2.length > 0) {
edges ++= edges2
}
// Preserve minimum distance
if (distance1 < distance) {
distance = distance1
}
if (distance2 < distance) {
distance = distance2
}
// Preserve darkest color
if (color1 == "WHITE" && (color2 == "GRAY" || color2 == "BLACK")) {
color = color2
}
if (color1 == "GRAY" && color2 == "BLACK") {
color = color2
}
if (color2 == "WHITE" && (color1 == "GRAY" || color1 == "BLACK")) {
color = color1
}
if (color2 == "GRAY" && color1 == "BLACK") {
color = color1
}
if (color1 == "GRAY" && color2 == "GRAY") {
color = color1
}
if (color1 == "BLACK" && color2 == "BLACK") {
color = color1
}
(edges.toArray, distance, color)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "DegreesOfSeparation")
// Our accumulator, used to signal when we find the target
// character in our BFS traversal.
hitCounter = Some(sc.longAccumulator("Hit Counter"))
var iterationRdd = createStartingRdd(sc)
for (iteration <- 1 to 10) {
println("Running BFS Iteration# " + iteration)
// Create new vertices as needed to darken or reduce distances in the
// reduce stage. If we encounter the node we're looking for as a GRAY
// node, increment our accumulator to signal that we're done.
val mapped = iterationRdd.flatMap(bfsMap)
// Note that mapped.count() action here forces the RDD to be evaluated, and
// that's the only reason our accumulator is actually updated.
println("Processing " + mapped.count() + " values.")
if (hitCounter.isDefined) {
val hitCount = hitCounter.get.value
if (hitCount > 0) {
println("Hit the target character! From " + hitCount +
" different direction(s).")
return
}
}
// Reducer combines data for each character ID, preserving the darkest
// color and shortest path.
iterationRdd = mapped.reduceByKey(bfsReduce)
}
}
}
with DataSet
package com.sundogsoftware.spark
import org.apache.spark.sql.functions._
import org.apache.log4j._
import org.apache.spark.sql.{Dataset, SparkSession}
/** Finds the degrees of separation between two Marvel comic book characters, based
* on co-appearances in a comic.
*/
object DegreesOfSeparationDataset {
// The characters we want to find the separation between.
val startCharacterID = 5306 //SpiderMan
val targetCharacterID = 14 //ADAM 3,031 (who?)
case class SuperHero(value: String)
case class BFSNode(id: Int, connections: Array[Int], distance: Int, color: String)
/** Create "iteration 0" of our RDD of BFSNodes */
def createStartingDs(spark:SparkSession): Dataset[BFSNode] = {
import spark.implicits._
val inputFile = spark.read
.text("data/Marvel-graph.txt")
.as[SuperHero]
// Parse the data such as first element will be in column id and all the rest will be in second column as Array
val connections = inputFile
.withColumn("id", split(col("value"), " ")(0).cast("int"))
.withColumn("connections", slice(split(col("value"), " "), 2, 9999).cast("array<int>"))
.select("id", "connections")
// Add distance and color columns
val result = connections
.withColumn("distance",
when(col("id") === startCharacterID,0)
.when(col("id") =!= startCharacterID,9999))
.withColumn("color",
when(col("id") === startCharacterID,"GRAY")
.when(col("id") =!= startCharacterID,"WHITE")).as[BFSNode]
result
}
def exploreNode(spark:SparkSession, ds: Dataset[BFSNode], iteration: Int): (Dataset[BFSNode], Long) = {
import spark.implicits._
// Get all node which needs to be explored
val rawExploreDS = ds
.filter($"color" === "GRAY")
.select($"id", explode($"connections").alias("child")).distinct()
val hitCount = rawExploreDS.filter($"child" === targetCharacterID).count()
val exploreDS = rawExploreDS.distinct().select("child")
// All parent become explored after getting exploreDS so we marked these as "BLACK"
val exploring = ds
.withColumn("color",
when(col("color") === "GRAY","BLACK")
.otherwise($"color")).as[BFSNode]
// Mark all explored nodes on this iteration which were not previously explored and set distance
val result = exploring
.join(exploreDS, exploring("color") === "WHITE" && exploring("id") === exploreDS("child"), "leftouter")
.withColumn("distance",
when(col("child").isNotNull, iteration)
.otherwise($"distance"))
.withColumn("color",
when(col("color") === "WHITE" && col("child").isNotNull, "GRAY")
.otherwise($"color"))
.select("id", "connections", "distance", "color").as[BFSNode]
(result, hitCount)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("DegreesOfSeparation")
.master("local[*]")
.getOrCreate()
// character in our BFS traversal.
var hitCount: Long = 0
// Build dataset
var iterationDs = createStartingDs(spark)
for (iteration <- 1 to 10) {
println("Running BFS Iteration# " + iteration)
val resultExplore = exploreNode(spark, iterationDs, iteration)
iterationDs = resultExplore._1
hitCount += resultExplore._2
if (hitCount > 0) {
println("Hit the target character! From " + hitCount +
" different direction(s).")
return
}
}
}
}
Example 7 : Item-Based Collaborative Filtering
Finding similar movies using Spark and the MovieLens data set
Caching or Persisting DataSets (or RDD / DataFrame)
- Any time you will perform more than one action on an dataset, you should cache it!
- Otherwise, Spark migh re-evaluate the entire dataset all over again!
- Use .cache() or .persist() to do this.
- Persist() optionally lets you cache it to disk instead of just memory, just in case a node fails or something.
- cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action
- cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster’s workers
- cache() and persist() functions are used to cache intermediate results of a RDD or DataFrame or Dataset. You can mark an RDD, DataFrame or Dataset to be persisted using the persist() or cache() methods on it.
with DataSet
package com.sundogsoftware.spark
import org.apache.spark.sql.functions._
import org.apache.log4j._
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
object MovieSimilaritiesDataset {
case class Movies(userID: Int, movieID: Int, rating: Int, timestamp: Long)
case class MoviesNames(movieID: Int, movieTitle: String)
case class MoviePairs(movie1: Int, movie2: Int, rating1: Int, rating2: Int)
case class MoviePairsSimilarity(movie1: Int, movie2: Int, score: Double, numPairs: Long)
def computeCosineSimilarity(spark: SparkSession, data: Dataset[MoviePairs]): Dataset[MoviePairsSimilarity] = {
// Compute xx, xy and yy columns
val pairScores = data
.withColumn("xx", col("rating1") * col("rating1"))
.withColumn("yy", col("rating2") * col("rating2"))
.withColumn("xy", col("rating1") * col("rating2"))
// Compute numerator, denominator and numPairs columns
val calculateSimilarity = pairScores
.groupBy("movie1", "movie2")
.agg(
sum(col("xy")).alias("numerator"),
(sqrt(sum(col("xx"))) * sqrt(sum(col("yy")))).alias("denominator"),
count(col("xy")).alias("numPairs")
)
// Calculate score and select only needed columns (movie1, movie2, score, numPairs)
import spark.implicits._
val result = calculateSimilarity
.withColumn("score",
when(col("denominator") =!= 0, col("numerator") / col("denominator"))
.otherwise(null)
).select("movie1", "movie2", "score", "numPairs").as[MoviePairsSimilarity]
result
}
/** Get movie name by given movie id */
def getMovieName(movieNames: Dataset[MoviesNames], movieId: Int): String = {
val result = movieNames.filter(col("movieID") === movieId)
.select("movieTitle").collect()(0)
result(0).toString
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("MovieSimilarities")
.master("local[*]")
.getOrCreate()
// Create schema when reading u.item
val moviesNamesSchema = new StructType()
.add("movieID", IntegerType, nullable = true)
.add("movieTitle", StringType, nullable = true)
// Create schema when reading u.data
val moviesSchema = new StructType()
.add("userID", IntegerType, nullable = true)
.add("movieID", IntegerType, nullable = true)
.add("rating", IntegerType, nullable = true)
.add("timestamp", LongType, nullable = true)
println("\nLoading movie names...")
import spark.implicits._
// Create a broadcast dataset of movieID and movieTitle.
// Apply ISO-885901 charset
val movieNames = spark.read
.option("sep", "|")
.option("charset", "ISO-8859-1")
.schema(moviesNamesSchema)
.csv("data/ml-100k/u.item")
.as[MoviesNames]
// Load up movie data as dataset
val movies = spark.read
.option("sep", "\t")
.schema(moviesSchema)
.csv("data/ml-100k/u.data")
.as[Movies]
val ratings = movies.select("userId", "movieId", "rating")
// Emit every movie rated together by the same user.
// Self-join to find every combination.
// Select movie pairs and rating pairs
val moviePairs = ratings.as("ratings1")
.join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
.select($"ratings1.movieId".alias("movie1"),
$"ratings2.movieId".alias("movie2"),
$"ratings1.rating".alias("rating1"),
$"ratings2.rating".alias("rating2")
).as[MoviePairs]
val moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()
if (args.length > 0) {
val scoreThreshold = 0.97
val coOccurrenceThreshold = 50.0
val movieID: Int = args(0).toInt
// Filter for movies with this sim that are "good" as defined by
// our quality thresholds above
val filteredResults = moviePairSimilarities.filter(
(col("movie1") === movieID || col("movie2") === movieID) &&
col("score") > scoreThreshold && col("numPairs") > coOccurrenceThreshold)
// Sort by quality score.
val results = filteredResults.sort(col("score").desc).take(10)
println("\nTop 10 similar movies for " + getMovieName(movieNames, movieID))
for (result <- results) {
// Display the similarity result that isn't the movie we're looking at
var similarMovieID = result.movie1
if (similarMovieID == movieID) {
similarMovieID = result.movie2
}
println(getMovieName(movieNames, similarMovieID) + "\tscore: " + result.score + "\tstrength: " + result.numPairs)
}
}
}
}
Improve the results! Some ideas to try:
- Discard bad ratings - only recommend good movies
- Try different similarity metrics (Pearson Correlation Coeeficient, Jaccard Coefficient, Conditional Probability)
- Adjust the threshold for minimum co-raters or minimum score
- Invent a new similarity metric that takes the number of co-raters into account
- Use genre info in u.items to boost scores from movies in the same genre
6. Running Spark on a Cluster
6.1 Using spark-submit to run Spark driver scripts
- spark-submit’s job is to read a jar file which contains your compiled spark application and distribute it to the whole cluster
6.1.1 Using EMR, tuning performance on a cluster
6.1.2 Packaging and deploying your application
6.2 What is SBT (Scala Build Tool)?
- Like Maven for Scala
- Manages your library dependency tree for you
- Can package up all of your dependencies into a self-contained JAR
- If you have many dependencies, it makes life a lot easier than passing a ton of -jars option
- Get it from scala-sbt.org
6.3 Cluster Manager
- Spark’s built-in cluster manager
- Hadoop’s Yarn cluster manager
6.4 Spark-Submit Parameters
- –master (yarn)
- –num-executors
- –executor-memory
- –total-executor-cores
6.5 .repartition(numPartitions=100) on DataFrame or .partitionBy() on RDD
- Atleast as many as number of executors
Spark Optimization
- Aim for fewer stages and less data re-shuffling
7. Machine Learning with Spark ML (uses DataFrame)
- Resources
- Advanced Analytics with Spark
- Examples in Spark SDK
- General ML courses
7.1 ML Capabilities
- Feature extraction
- Term Frequency / Inverse Document Frequency useful for search
- Basic statistics
- Chi-squared test, Pearson or Spearman correlation, min, max, mean, variance
- Linear regression, logistic regression
- Support Vector Machines
- Naive Bayes classifier
- Decision trees
- K-Means clustering
- Principal component analysis, singular value decomposition
- Recommendations using Alternating Least Squares
7.2 Movie recommendations using ALS from Spark’s ML library
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.ml.recommendation._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import scala.collection.mutable
object MovieRecommendationsALSDataset {
case class MoviesNames(movieId: Int, movieTitle: String)
// Row format to feed into ALS
case class Rating(userID: Int, movieID: Int, rating: Float)
// Get movie name by given dataset and id
def getMovieName(movieNames: Array[MoviesNames], movieId: Int): String = {
val result = movieNames.filter(_.movieId == movieId)(0)
result.movieTitle
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Make a session
val spark = SparkSession
.builder
.appName("ALSExample")
.master("local[*]")
.getOrCreate()
println("Loading movie names...")
// Create schema when reading u.item
val moviesNamesSchema = new StructType()
.add("movieID", IntegerType, nullable = true)
.add("movieTitle", StringType, nullable = true)
// Create schema when reading u.data
val moviesSchema = new StructType()
.add("userID", IntegerType, nullable = true)
.add("movieID", IntegerType, nullable = true)
.add("rating", IntegerType, nullable = true)
.add("timestamp", LongType, nullable = true)
import spark.implicits._
// Create a broadcast dataset of movieID and movieTitle.
// Apply ISO-885901 charset
val names = spark.read
.option("sep", "|")
.option("charset", "ISO-8859-1")
.schema(moviesNamesSchema)
.csv("data/ml-100k/u.item")
.as[MoviesNames]
val namesList = names.collect()
// Load up movie data as dataset
val ratings = spark.read
.option("sep", "\t")
.schema(moviesSchema)
.csv("data/ml-100k/u.data")
.as[Rating]
// Build the recommendation model using Alternating Least Squares
println("\nTraining recommendation model...")
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userID")
.setItemCol("movieID")
.setRatingCol("rating")
val model = als.fit(ratings)
// Get top-10 recommendations for the user we specified
val userID:Int = args(0).toInt
val users = Seq(userID).toDF("userID")
val recommendations = model.recommendForUserSubset(users, 10)
// Display them (oddly, this is the hardest part!)
println("\nTop 10 recommendations for user ID " + userID + ":")
for (userRecs <- recommendations) {
val myRecs = userRecs(1) // First column is userID, second is the recs
val temp = myRecs.asInstanceOf[mutable.WrappedArray[Row]] // Tell Scala what it is
for (rec <- temp) {
val movie = rec.getAs[Int](0)
val rating = rec.getAs[Float](1)
val movieName = getMovieName(namesList, movie)
println(movieName, rating)
}
}
// Stop the session
spark.stop()
}
}
7.3 Linear Regression with Spark.ML
package com.sundogsoftware.spark
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.types._
object LinearRegressionDataFrameDataset {
case class RegressionSchema(label: Double, features_raw: Double)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("LinearRegressionDF")
.master("local[*]")
.getOrCreate()
// Load up our page speed / amount spent data in the format required by MLLib
// (which is label, vector of features)
// In machine learning lingo, "label" is just the value you're trying to predict, and
// "feature" is the data you are given to make a prediction with. So in this example
// the "labels" are the first column of our data, and "features" are the second column.
// You can have more than one "feature" which is why a vector is required.
val regressionSchema = new StructType()
.add("label", DoubleType, nullable = true)
.add("features_raw", DoubleType, nullable = true)
import spark.implicits._
val dsRaw = spark.read
.option("sep", ",")
.schema(regressionSchema)
.csv("data/regression.txt")
.as[RegressionSchema]
val assembler = new VectorAssembler().
setInputCols(Array("features_raw")).
setOutputCol("features")
val df = assembler.transform(dsRaw)
.select("label","features")
// Let's split our data into training data and testing data
val trainTest = df.randomSplit(Array(0.5, 0.5))
val trainingDF = trainTest(0)
val testDF = trainTest(1)
// Now create our linear regression model
val lir = new LinearRegression()
.setRegParam(0.3) // regularization
.setElasticNetParam(0.8) // elastic net mixing
.setMaxIter(100) // max iterations
.setTol(1E-6) // convergence tolerance
// Train the model using our training data
val model = lir.fit(trainingDF)
// Now see if we can predict values in our test data.
// Generate predictions using our linear regression model for all features in our
// test dataframe:
val fullPredictions = model.transform(testDF).cache()
// This basically adds a "prediction" column to our testDF dataframe.
// Extract the predictions and the "known" correct labels.
val predictionAndLabel = fullPredictions.select("prediction", "label").collect()
// Print out the predicted and actual values for each point
for (prediction <- predictionAndLabel) {
println(prediction)
}
// Stop the session
spark.stop()
}
}
7.4 Predict Real Estate with Decision Trees in Spark.ML
8. Intro to Spark Streaming
- Streaming sources of data (unline batch data)
- Example : Log data and real time analysis; Set-up alarm
- Source : Can take data fed to some port; Amazon Kinesis, HDFS, Kafka, Flume and others
- Checkpointing : stores state to disk periodically for fault tolerance
8.1 Old API : The DStream API for Spark Streaming
- Created micro batches (of say 1 seconds worth of data) and process it as RDD
- window(), reduceByWindow(), reduceByKeyAndWindow()
val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow((x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
(#BoatyMcBoatFace, 2)
(#WhatIceberg, 1)
Example 1 : Twitter Streaming API and Spark DStream
package com.sundogsoftware.spark
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
/** Listens to a stream of Tweets and keeps track of the most popular
* hashtags over a 5 minute window.
*/
object PopularHashtags {
/** Makes sure only ERROR messages get logged to avoid log spam. */
def setupLogging(): Unit = {
import org.apache.log4j.{Level, Logger}
val rootLogger = Logger.getRootLogger
rootLogger.setLevel(Level.ERROR)
}
/** Configures Twitter service credentials using twitter.txt in the main workspace directory */
def setupTwitter(): Unit = {
import scala.io.Source
val lines = Source.fromFile("data/twitter.txt")
for (line <- lines.getLines) {
val fields = line.split(" ")
if (fields.length == 2) {
System.setProperty("twitter4j.oauth." + fields(0), fields(1))
}
}
lines.close()
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Configure Twitter credentials using twitter.txt
setupTwitter()
// Set up a Spark streaming context named "PopularHashtags" that runs locally using
// all CPU cores and one-second batches of data
val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
// Get rid of log spam (should be called after the context is set up)
setupLogging()
// Create a DStream from Twitter using our streaming context
val tweets = TwitterUtils.createStream(ssc, None)
// Now extract the text of each status update into DStreams using map()
val statuses = tweets.map(status => status.getText)
// Blow out each word into a new DStream
val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
// Now eliminate anything that's not a hashtag
val hashtags = tweetwords.filter(word => word.startsWith("#"))
// Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
// Now count them up over a 5 minute window sliding every one second
val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
// You will often see this written in the following shorthand:
//val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(300), Seconds(1))
// Sort the results by the count values
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, ascending = false))
// Print the top 10
sortedResults.print
// Set a checkpoint directory, and kick it all off
// I could watch this all day!
ssc.checkpoint("C:/checkpoint/")
ssc.start()
ssc.awaitTermination()
}
}
8.2 New API : The Structured Streaming API for Spark Streaming
- Truly real-time and process it as DataSets
- DataSet just keeps getting appended
Example 2 : Structured Streaming API and Logs Data
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
/** Find the movies with the most ratings. */
object StructuredStreaming {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.getOrCreate()
// Streaming source that monitors the data/logs directory for text files
val accessLines = spark.readStream.text("data/logs")
// Regular expressions to extract pieces of Apache access log lines
val contentSizeExp = "\\s(\\d+)$"
val statusExp = "\\s(\\d{3})\\s"
val generalExp = "\"(\\S+)\\s(\\S+)\\s*(\\S*)\""
val timeExp = "\\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} -\\d{4})]"
val hostExp = "(^\\S+\\.[\\S+\\.]+\\S+)\\s"
// Apply these regular expressions to create structure from the unstructured text
val logsDF = accessLines.select(regexp_extract(col("value"), hostExp, 1).alias("host"),
regexp_extract(col("value"), timeExp, 1).alias("timestamp"),
regexp_extract(col("value"), generalExp, 1).alias("method"),
regexp_extract(col("value"), generalExp, 2).alias("endpoint"),
regexp_extract(col("value"), generalExp, 3).alias("protocol"),
regexp_extract(col("value"), statusExp, 1).cast("Integer").alias("status"),
regexp_extract(col("value"), contentSizeExp, 1).cast("Integer").alias("content_size"))
// Keep a running count of status codes
val statusCountsDF = logsDF.groupBy("status").count()
// Display the stream to the console
val query = statusCountsDF.writeStream.outputMode("complete").format("console").queryName("counts").start()
// Wait until we terminate the scripts
query.awaitTermination()
// Stop the session
spark.stop()
}
}
Example 3 : Windows Operations with Structured Streaming
package com.sundogsoftware.spark
import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
/** Find the movies with the most ratings. */
object TopURLs {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.getOrCreate()
// Streaming source that monitors the data/logs directory for text files
val accessLines = spark.readStream.text("data/logs")
// Regular expressions to extract pieces of Apache access log lines
val contentSizeExp = "\\s(\\d+)$"
val statusExp = "\\s(\\d{3})\\s"
val generalExp = "\"(\\S+)\\s(\\S+)\\s*(\\S*)\""
val timeExp = "\\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} -\\d{4})]"
val hostExp = "(^\\S+\\.[\\S+\\.]+\\S+)\\s"
// Apply these regular expressions to create structure from the unstructured text
val logsDF = accessLines.select(regexp_extract(col("value"), hostExp, 1).alias("host"),
regexp_extract(col("value"), timeExp, 1).alias("timestamp"),
regexp_extract(col("value"), generalExp, 1).alias("method"),
regexp_extract(col("value"), generalExp, 2).alias("endpoint"),
regexp_extract(col("value"), generalExp, 3).alias("protocol"),
regexp_extract(col("value"), statusExp, 1).cast("Integer").alias("status"),
regexp_extract(col("value"), contentSizeExp, 1).cast("Integer").alias("content_size"))
val logsDF2 = logsDF.withColumn("eventTime", current_timestamp())
// Keep a running count of endpoints
val endpointCounts = logsDF2.groupBy(window(col("eventTime"),
"30 seconds", "10 seconds"), col("endpoint")).count()
val sortedEndpointCounts = endpointCounts.orderBy(col("count").desc)
// Display the stream to the console
val query = sortedEndpointCounts.writeStream.outputMode("complete").format("console")
.queryName("counts").start()
// Wait until we terminate the scripts
query.awaitTermination()
// Stop the session
spark.stop()
}
}
9. GraphX, Pregel and Breadth-First Search
- Introduces VertexRDD and EdgeRDD, and the Edge data type
- Otherwise, GraphX code looks like any other Spark code for the most part
Example Uses:
- It can measure things like “connectedness”, degree of distribution, average path length, triangle counts - high level measures of a graph
- It can count triangles in the graph, and apply the PageRank algorithm
- It can also join graphs together and transform graphs quickly
- For things like our “degrees of separation” example, there is no pre built support but does support Pregel API for traversing a graph
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.log4j._
import org.apache.spark.graphx._
/** Some examples of GraphX in action with the Marvel superhero dataset! */
object GraphX {
// Function to extract hero ID -> hero name tuples (or None in case of failure)
def parseNames(line: String) : Option[(VertexId, String)] = {
val fields = line.split('\"')
if (fields.length > 1) {
val heroID:Long = fields(0).trim().toLong
if (heroID < 6487) { // ID's above 6486 aren't real characters
return Some( fields(0).trim().toLong, fields(1))
}
}
None // flatmap will just discard None results, and extract data from Some results.
}
/** Transform an input line from marvel-graph.txt into a List of Edges */
def makeEdges(line: String) : List[Edge[Int]] = {
import scala.collection.mutable.ListBuffer
var edges = new ListBuffer[Edge[Int]]()
val fields = line.split(" ")
val origin = fields(0)
for (x <- 1 until (fields.length - 1)) {
// Our attribute field is unused, but in other graphs could
// be used to deep track of physical distances etc.
edges += Edge(origin.toLong, fields(x).toLong, 0)
}
edges.toList
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "GraphX")
// Build up our vertices
val names = sc.textFile("data/marvel-names.txt")
val verts = names.flatMap(parseNames)
// Build up our edges
val lines = sc.textFile("data/marvel-graph.txt")
val edges = lines.flatMap(makeEdges)
// Build up our graph, and cache it as we're going to do a bunch of stuff with it.
val default = "Nobody"
val graph = Graph(verts, edges, default).cache()
// Find the top 10 most-connected superheroes, using graph.degrees:
println("\nTop 10 most-connected superheroes:")
// The join merges the hero names into the output; sorts by total connections on each node.
graph.degrees.join(verts).sortBy(_._2._1, ascending=false).take(10).foreach(println)
// Now let's do Breadth-First Search using the Pregel API
println("\nComputing degrees of separation from SpiderMan...")
// Start from SpiderMan
val root: VertexId = 5306 // SpiderMan
// Initialize each node with a distance of infinity, unless it's our starting point
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
// Now the Pregel magic
val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)(
// Our "vertex program" preserves the shortest distance
// between an inbound message and its current value.
// It receives the vertex ID we are operating on,
// the attribute already stored with the vertex, and
// the inbound message from this iteration.
(id, attr, msg) => math.min(attr, msg),
// Our "send message" function propagates out to all neighbors
// with the distance incremented by one.
triplet => {
if (triplet.srcAttr != Double.PositiveInfinity) {
Iterator((triplet.dstId, triplet.srcAttr+1))
} else {
Iterator.empty
}
},
// The "reduce" operation preserves the minimum
// of messages received by a vertex if multiple
// messages are received by one vertex
(a,b) => math.min(a,b) ).cache()
// Print out the first 100 results:
bfs.vertices.join(verts).take(100).foreach(println)
// Recreate our "degrees of separation" result:
println("\n\nDegrees from SpiderMan to ADAM 3,031") // ADAM 3031 is hero ID 14
bfs.vertices.filter(x => x._1 == 14).collect.foreach(println)
}
}
10. More on Spark
- Learning Spark by O’Reily
- Learning Scala by O’Reily
- Advanced Analytics with Spark by O’Reily
- https://spark.apache.org/
Leave a Comment