Apache Spark & Scala

SparkScala

1. Apache Spark:

  • Fast & general engine for large scale data processing
  • Spark can run on a Hadoop cluster to spread out massive data analysis and machine learning tasks
  • Distribute massive data processing across fleet of computers in parallel
  • Spark is in most cases a better alternative to Hadoop’s MapReduce component, but Spark can run on top of a Hadoop cluster, taking advantage of its distributed file system and YARN cluster manager. Spark also has its own built-in cluster manager allowing you to use it outside of Hadoop, but it is not a complete replacement for Hadoop. Spark and Hadoop can co-exist.

1.1 Course Material

  • Install Java, IntelliJ, Scala Plugin, and course materials
  • Install JDK 14 from Oracle Archive
  • Install IntelliJ
  • Add environment variable if using Windows OS
    • Variable : HADOOP_HOME = c:\hadoop
    • Add to path : %HADOOP_HOME%\bin
  • https://www.sundog-education.com/sparkscala/

1.2 Spark Architecture Notes

  • Driver Program - Spark Context
  • Cluster Manager - Inbuilt Spark Cluster Manager or Hadoop’s YARN Cluster Manager (orchestrate the cluster of computers)
    • How to spin up the resources you need
    • How to distribute the work
    • Where to put the different jobs in the most optimal place
    • Where the data is most accessible
  • Multiple Executor - Cache, Tasks
  • Spark can run on top of Hadoop but doesn’t have to
  • Individual nodes or executors - has its own Cache, has its own Task
  • 10 to 100 times faster than Hadoop Map Reduce
  • Is fast due to DAG (Directed Acyclic Graph) Engine which optimizes workflows
  • More flexible and in-memory than MapReduce
  • Mature technology & widely adopted
  • Spark SQL on Datasets and DataFrames (higher level API)
  • Resilient Distributed Dataset (original & lower level API)
  • Components:
    • Spark Core (lower level API)
      • Spark Streaming (higher level API) - Real time monitoring & data analysis
      • Spark SQL (higher level API) - Structured data
      • MLlib (higher level API) - Distributed ML but somewhat limited
      • GraphX (higher level API) - Social network analysis but not kind of well maintained

1.3 Why Scala?

  • Spark itself is written in Scala
  • Scala’s functional programming model is a good fit for distributed processing
  • Enorces you write code in a way such that functions can be easily distrbuted across entire cluster
  • More likely to be easily parallelized
  • Gives you fast performance (Scala compiles to Java bytecode)
  • Less code & boilerplate stuff than Java
  • Python is slower in comparison
  • Not as slow as it used to be

But

  • You probably don’t know Scala
  • So let’s learn the basics

Python Equivalent

nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x * x).collect()

Scala Equivalent

val nums = sc.parallelize(List(1,2,3,4))
val squared = nums.map(x => x * x).collect() 

2. Scala Basics

  • Yet another programming language
  • Uniquely suited for Spark because its structured in way that lends itself to distributed processing over a cluster
  • Faster & reliable than Python
  • It’s what Spark is built with:
  • New features are often Scala-first
  • You can also program Spark with Python or Java
  • Runs on top of the Java Virtual Machine (JVM)
  • Can acess Java classes
  • Compiles down to Java byte code
  • Functional programming

2.1 Decalaring Immutable Values & Mutable Variables

   // VALUES are immutable constants.
   // Immutable - An object whose state cannot be changed after it has been defined
   // Defined using one line or one atomic operation
   val hello: String = "Hola!"
   // Stick to immutable constant - avoid thread safety issue because of multiple threads trying to change the value at the same time

   // VARIABLES are mutable
   // Mutable - An object whose state can be changed after it has been defined
   var helloThere: String = hello
   helloThere = hello + " There!"
   println(helloThere)

   val immutableHelloThere = hello + " There"
   println(immutableHelloThere)

   // Data Types

   val numberOne: Int = 1
   val truth: Boolean = true
   val letterA: Char = 'a'
   val pi: Double = 3.14159265
   val piSinglePrecision: Float = 3.14159265f
   val bigNumber: Long = 123456789
   val smallNumber: Byte = 127

   println("Here is a mess: " + numberOne + truth + letterA + pi + bigNumber)

   println(f"Pi is about $piSinglePrecision%.3f")
   println(f"Zero padding on the left: $numberOne%05d")

   println(s"I can use the s prefix to use variables like $numberOne $truth $letterA")

   println(s"The s prefix isn't limited to variables; I can include any expression. Like ${1+2}")

   val theUltimateAnswer: String = "To life, the universe, and everything is 42."
   val pattern = """.* ([\d]+).*""".r
   val pattern(answerString) = theUltimateAnswer
   val answer = answerString.toInt
   println(answer)

   // Booleans
   val isGreater = 1 > 2
   val isLesser = 1 < 2
   val impossible = isGreater & isLesser // bitwise and
   val impossible = isGreater && isLesser // logical and   
   val anotherWay = isGreater || isLesser

   val picard: String = "Picard"
   val bestCaptain: String = "Picard"
   val isBest: Boolean = picard == bestCaptain

2.2 Flow Control in Scala

// Flow control

// If / else:
if (1 > 3) println("Impossible!") else println("The world makes sense.")

if (1 > 3) {
  println("Impossible!")
  println("Really?")
} else {
  println("The world makes sense.")
  println("still.")
}

// Matching
val number = 2
number match {
  case 1 => println("One")
  case 2 => println("Two")
  case 3 => println("Three")
  case _ => println("Something else")
}

for (x <- 1 to 4) {
  val squared = x * x
  println(squared)
}

var x = 10
while (x >= 0) {
  println(x)
  x -= 1
}

x = 0
do { println(x); x+=1 } while (x <= 10)

// Expressions

{val x = 10; x + 20}

println({val x = 10; x + 20})

2.3 Functions in Scala

// Functions

// format def <function name>(parameter name: type...) : return type = { }

def squareIt(x: Int) : Int = {
  x * x
}

def cubeIt(x : Int) : Int = {x * x * x}

println(squareIt(2))

println(cubeIt(3))

// Function which takes an integer and another function as parameter; then returns an integer
// Important concept in functional programming
def transformInt(x: Int, f: Int => Int): Int = {
  f(x)
}

val result = transformInt(2, cubeIt)
println(result)

// Lambda Function, Anonymous Inline Function, Function Literal
transformInt(3, x => x * x * x)

transformInt(10, x => x / 2)

transformInt(2, x => {val y = x * 2; y * y})

2.4 Data Structures in Scala

// Data structures

// Tuples
// Immutable lists

val captainStuff = ("Picard", "Enterprise-D", "NCC-1701-D")
println(captainStuff)

// Refer to the individual fields with a ONE-BASED index
println(captainStuff._1)
println(captainStuff._2)
println(captainStuff._3)

// Create key-value pair
val picardsShip = "Picard" -> "Enterprise-D"
println(picardsShip._2)

// Tuples can hold items of different types
val aBunchOfStuff = ("Kirk", 1964, true)

// Lists
// Like a tuple, but more functionality
// Must be of same type

val shipList = List("Enterprise", "Defiant", "Voyager", "Deep Space Nine")

println(shipList(1))
// Unlike Tuples which have one-based indexing, Lists have zero-based indexing

println(shipList.head) // First item in list
println(shipList.tail) // All items excluding the first

for (ship <- shipList) {println(ship)}

// Map is very important in the world of functional programming
val backwardShips = shipList.map( (ship: String) => {ship.reverse})
for (ship <- backwardShips) {println(ship)}

// reduce() to combine together all the items in a collection using some function
val numberList = List(1, 2, 3, 4, 5)
val sum = numberList.reduce( (x: Int, y: Int) => x + y)
println(sum)
// returns 15

// filter() removes stuff
val iHateFives = numberList.filter( (x: Int) => x != 5)
// shorthand expression
val iHateThrees = numberList.filter(_ != 3)

// Concatenate lists
val moreNumbers = List(6,7,8)
val lotsOfNumbers = numberList ++ moreNumbers

val reversed = numberList.reverse
val sorted = reversed.sorted
val lotsOfDuplicates = numberList ++ numberList
val distinctValues = lotsOfDuplicates.distinct
val maxValue = numberList.max
val total = numberList.sum
val hasThree = iHateThrees.contains(3)

// MAPS (in other languages known as : dictionaries or key-value lookups)
val shipMap = Map("Kirk" -> "Enterprise", "Picard" -> "Enterprise-D", "Sisko" -> "Deep Space Nine", "Janeway" -> "Voyager")
println(shipMap("Janeway"))
println(shipMap.contains("Archer"))

// Exception handling
val archersShip = util.Try(shipMap("Archer")) getOrElse "Unknown"
println(archersShip)

3. Using Resilient Distributed Datasets

3.1 RDD - Original & Old API (Spark Roots)

  • Resilient
  • Distributed
  • Dataset

3.1.1 Spark Context

  • Created by your driver program
  • Is responsible for making RDD’s resilient and distributed
  • The Spark shell creates a “sc” object for you
val nums = parallelize(List(1,2,3,4))
sc.textfile("file:///c:/users/frank/gobs-o-text.txt")

s3n:// or hdfs://

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")

// Can also create RDD from JDBC, NoSQL Databases (Cassandra, HBase), Elasticsearch, JSON, .csv, various compressed datasets

3.1.2 Transforming RDD

  • map
  • flatmap
  • filter
  • distinct
  • sample
  • union, intersection, subtract, cartesian
val rdd = sc.parallelize(List(1,2,3,4))

// Functional Programming : many RDD methods accept functions
// Functional Programming : passing functions around
// Scala forcing you to write functions that can be distributed
val squares = rdd.map(x => x*x)

val squares = rdd.map(squareIt)

// This yields 1, 4, 9, 16

3.1.3 Actions on RDD

  • An action is something that causes your RDD to collapse and gives a result back to driver script
    • collect
    • count
    • countByValue
    • take
    • top
    • reduce
  • Lazy Evaluation - Nothing actually happens in your driver program until an action is called

3.1.4 Ratings histogram example

// Import what you need
package com.sundogsoftware.spark

// Import the packages that we need
import org.apache.spark._ // Everything from Spark
import org.apache.log4j._ // Everything from Log4j; Adjust our error level and logging

// Structure every driver script needs to conform to
/** Count up how many of each star rating exists in the MovieLens 100K data set. */
object RatingsCounter {
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
        
    // Create a SparkContext using every core of the local machine, named RatingsCounter
    // The * means Spark will atleast parallelize across different CPU cores
    val sc = new SparkContext("local[*]", "RatingsCounter")
   
    // Load up each line of the ratings data into an RDD
    val lines = sc.textFile("data/ml-100k/u.data")
    
    // Convert each line to a string, split it out by tabs, and extract the third field.
    // (The file format is userID, movieID, rating, timestamp)
    val ratings = lines.map(x => x.split("\t")(2))
    
    // Count up how many times each value (rating) occurs
    // Calling an action
    val results = ratings.countByValue()
    
    // Sort the resulting map of (rating, count) tuples
    val sortedResults = results.toSeq.sortBy(_._1)
    
    // Print each result on its own line.
    sortedResults.foreach(println)
  }
}

// Returns :
// (1,6110)
// (2,11370)
// (3,27145)
// (4,34174)
// (5,21201)

3.1.5 Spark Internals

  • Spark created an execution plan using DAG when the action is called
  • Job is broken into Stages (based on when data needs to be re-organized), Stages broken into Tasks
  • Finally the tasks are scheduled across your cluster and executed

3.1.6 Special RDDs : Key-Value RDDs

  • For example: number of friends by age
  • Key is age, Value is number of friends
  • Instead of just a list of ages or a list of # of friends, we can store them as key-value RDDs
  • Spark can do special stuff with key-value data
  • You can also do SQL-style joins (can do it but isn’t done in modern spark)
  • Mapping just the values of a key-value RDD

3.1.7 Example 1 : Average friends by Age

// key: x | value: 1
totalsByAge = rdd.map(x => (x,1))

// Spark can do special stuff with key-value data
rdd.reduceByKey(x,y) => x + y) : combine values with the same key using some function. rdd.reduceByKey( (x, y) => x + y)
groupByKey() : Group values with the same key
sortByKey() : Sort RDD by key values
keys(), values() : Create an RDD of just the keys, or just the values
package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._

/** Compute the average number of friends by age in a social network. */
object FriendsByAge {
  
  /** A function that splits a line of input into (age, numFriends) tuples. */
  def parseLine(line: String): (Int, Int) = {
      // Split by commas
      val fields = line.split(",")
      // Extract the age and numFriends fields, and convert to integers
      val age = fields(2).toInt
      val numFriends = fields(3).toInt
      // Create a tuple that is our result.
      (age, numFriends)
  }
  
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
        
    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "FriendsByAge")
  
    // Load each line of the source data into an RDD
    val lines = sc.textFile("data/fakefriends-noheader.csv")
    
    // Use our parseLines function to convert to (age, numFriends) tuples
    val rdd = lines.map(parseLine)
    // Returns Tuples:
    // 33, 385
    // 33, 2
    // 55, 221
    // 40, 465
    
    // Lots going on here...
    // We are starting with an RDD of form (age, numFriends) where age is the KEY and numFriends is the VALUE
    // We use mapValues to convert each numFriends value to a tuple of (numFriends, 1)
    // Then we use reduceByKey to sum up the total numFriends and total instances for each age, by
    // adding together all the numFriends values and 1's respectively.
    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
    
    // rdd.mapValues(x => (x, 1))
    // Returns:
    // (33, (385, 1))
    // (33, (2, 1))
    // (55, (221, 1))
    // (40, (465, 1))
    
    // reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
    // Returns:
    // (33, (387, 2))
    // (55, (221, 1))
    // (40, (465, 1))    
    
    // So now we have tuples of (age, (totalFriends, totalInstances))
    // To compute the average we divide totalFriends / totalInstances for each age.
    val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
    
    // Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
    val results = averagesByAge.collect()
    
    // Returns:
    // (33, 193.5)
    // (55, 221)
    // (40, 465)    
    
    // Sort and print the final results.
    results.sorted.foreach(println)
  }
    
}

3.1.8 Example 2 : Minimum Temperature by Location

val minTemps = parsedLines.filter(x => x._2 == "TMIN")
package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures {
  
  def parseLine(line:String): (String, String, Float) = {
    val fields = line.split(regex = ",")
    val stationID = fields(0)
    val entryType = fields(2)
    val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
    (stationID, entryType, temperature)
  }
    /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext(master = "local[*]", appName = "MinTemperatures")
    
    // Read each line of input data
    val lines = sc.textFile(path = "data/1800.csv")
    
    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(parseLine)
    
    // Filter out all but TMIN entries
    val minTemps = parsedLines.filter(x => x._2 == "TMIN")
    
    // Convert to (stationID, temperature)
    val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
    
    // Reduce by stationID retaining the minimum temperature found
    val minTempsByStation = stationTemps.reduceByKey( (x,y) => min(x,y))
    
    // Collect, format, and print the results
    val results = minTempsByStation.collect()
    
    for (result <- results.sorted) {
       val station = result._1
       val temp = result._2
       val formattedTemp = f"$temp%.2f F"
       println(s"$station minimum temperature: $formattedTemp") 
    }
      
  }
}

3.1.9 Example 3 : Counting Word Occurrences using Flatmap()

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._

/** Count up how many of each word appears in a book as simply as possible. */
object WordCount {
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "WordCount")   
    
    // Read each line of my book into an RDD
    val input = sc.textFile("data/book.txt")
    
    // Split into words separated by a space character
    val words = input.flatMap(x => x.split(" "))
    
    // Count up the occurrences of each word
    val wordCounts = words.countByValue()
    
    // Print the results.
    wordCounts.foreach(println)
  }
  
}

3.1.10 Example 4 : Improving the Word Count Script with Regular Expressions

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._

/** Count up how many of each word occurs in a book, using regular expressions. */
object WordCountBetter {
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "WordCountBetter")   
    
    // Load each line of my book into an RDD
    val input = sc.textFile("data/book.txt")
    
    // Split using a regular expression that extracts words
    // Regular expression knows what a word is
    val words = input.flatMap(x => x.split("\\W+"))
    
    // Normalize everything to lowercase
    val lowercaseWords = words.map(x => x.toLowerCase())
    
    // Count of the occurrences of each word
    val wordCounts = lowercaseWords.countByValue()
    
    // Print the results
    wordCounts.foreach(println)
  }

3.1.11 Example 5 : Sorting the Word Count Results

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._

/** Count up how many of each word occurs in a book, using regular expressions and sorting the final results */
object WordCountBetterSorted {
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using the local machine
    val sc = new SparkContext("local", "WordCountBetterSorted")   
    
    // Load each line of my book into an RDD
    val input = sc.textFile("data/book.txt")
    
    // Split using a regular expression that extracts words
    val words = input.flatMap(x => x.split("\\W+"))
    
    // Normalize everything to lowercase
    val lowercaseWords = words.map(x => x.toLowerCase())
    
    // Count of the occurrences of each word
    val wordCounts = lowercaseWords.map(x => (x, 1)).reduceByKey( (x,y) => x + y )
    
    // Flip (word, count) tuples to (count, word) and then sort by key (the counts)
    val wordCountsSorted = wordCounts.map( x => (x._2, x._1) ).sortByKey()
    
    // Print the results, flipping the (count, word) results to word: count as we go.
    for (result <- wordCountsSorted) {
      val count = result._1
      val word = result._2
      println(s"$word: $count")
    }
    
  }
  
}

3.1.12 Example 6 : Total Amount Spent by Customer

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._

/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomer {
  
  /** Convert input data to (customerID, amountSpent) tuples */
  def extractCustomerPricePairs(line: String): (Int, Float) = {
    val fields = line.split(",")
    (fields(0).toInt, fields(2).toFloat)
  }
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "TotalSpentByCustomer")   
    
    val input = sc.textFile("data/customer-orders.csv")

    val mappedInput = input.map(extractCustomerPricePairs)
    
    val totalByCustomer = mappedInput.reduceByKey( (x,y) => x + y )
    
    val results = totalByCustomer.collect()
    
    // Print the results.
    results.foreach(println)
  }
  
}

3.1.13 Example 7 : Total Amount Spent by Customer

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._

/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomerSorted {
  
  /** Convert input data to (customerID, amountSpent) tuples */
  def extractCustomerPricePairs(line: String): (Int, Float) = {
    val fields = line.split(",")
    (fields(0).toInt, fields(2).toFloat)
  }
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "TotalSpentByCustomerSorted")   
    
    val input = sc.textFile("data/customer-orders.csv")

    val mappedInput = input.map(extractCustomerPricePairs)
    
    val totalByCustomer = mappedInput.reduceByKey( (x,y) => x + y )
    
    val flipped = totalByCustomer.map( x => (x._2, x._1) )
    
    val totalByCustomerSorted = flipped.sortByKey()
    
    val results = totalByCustomerSorted.collect()
    
    // Print the results.
    results.foreach(println)
  }
  
}

4. SparkSQL, DataFrames, and Datasets

4.1 Intro to SparkSQL

  • Modern API for Spark (layer above Spark core)
  • DataFrames and DataSets

4.2 DataFrames

  • Contain Row objects
  • Can run SQL queries (since has a schema)
  • Has a schema
  • Communicates with JDBC & Tableau
  • Can also use SQL optimization

4.3 DataSets

  • DataSets can explicitly wrap a given struct or type
    • It knows what it columns are from the get-go
  • DataFrames schema is inferred at runtime; but a DataSet can be inferred at compile time
    • Faster detection of errors and better optimization
    • Datasets can only be. used in compiled languages (Java, Scala) but not interpreted language (Python)
    • Yet another reason to use Scala
  • RDD’s can be converted to DataSets with .toDS()
    • RDDs are still useful for some operations
  • The trend in Spark is to use RDD’s less and DataSets more
  • DataSets are more efficient
    • They can be serialized very efficiently - even better than Kyro
    • Optimal execution plans can be determined at compile time
  • DataSets allow for better interoperability
    • MLLib and Spark Streaming are moving toward using dataSets instead of RDDs for their primary API
  • DataSets simplify development
    • You can perform most SQL operations on a dataset with one line
  • DataSets are the new hotness
  • Create a SparkSession object instead of a SparkContext when using Spark SQL / DataSets
  • Spark SQL exposes a JDBC/ODBC server (if you built Spark with Hive support)
  • Start it with sbin/start-thriftserver.sh
  • Connect using bee/beeline -u jdbc:hive2://localhost:10000
  • Viola, you have a SQL shell to Spark SQL
  • You can create new tables, or query existing ones that were cached using hiveCtx.cacheTable(“tableName”)
// Other stuff you can do with datasets

myResultDataset.show()

myResultDataset.select("someFieldName")

myResultDataset.filter(myResultDataset("someFieldName") > 200)

myResultDataset.groupBy(myResultDataset("someFieldName")).mean()

myResultDataset.rdd().map(mapperFunction)
  • You can create user-defined functions (UDFs)
import org.apache.spark.sql.functions.udf

val square = udf{(x=>x*x)}
squaredDF = df.withColumn("square", square('value'))

4.4 Let’s play with SparkSQL, DataFrames and DataSets : Fake Friends Data

Example 1 : Fake Friend’s data and Spark SQL API

  • Case Class - compact way of defining an object
  • Contructs virtual DB and schema for the data
  • For datasets we cant infer schema and needs to be defined at compile time
package com.sundogsoftware.spark

import org.apache.spark.sql._
import org.apache.log4j._

object SparkSQLDataset {
  // Case Class - compact way of defining an object
  case class Person(id:Int, name:String, age:Int, friends:Int)

  /** Our main function where the action happens */
  def main(args: Array[String]) {
    
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
    // Use SparkSession interface
    val spark = SparkSession
      .builder
      .appName("SparkSQL")
      .master("local[*]")
      .getOrCreate()

    // Load each line of the source data into an Dataset
    import spark.implicits._
    val schemaPeople = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("data/fakefriends.csv")
      // till here a DataFrame but converting into a DataSet by giving a schema at compile time
      // otherwise will be inferred at RunTime
      .as[Person]

    schemaPeople.printSchema()
    
    schemaPeople.createOrReplaceTempView("people")

    val teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
    
    val results = teenagers.collect()
    
    results.foreach(println)
    
    spark.stop()
  }
}

Example 2 : DataFrames & DataSet

package com.sundogsoftware.spark

import org.apache.spark.sql._
import org.apache.log4j._
    
object DataFramesDataset {
  
  case class Person(id:Int, name:String, age:Int, friends:Int)

  /** Our main function where the action happens */
  def main(args: Array[String]) {
    
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
    // Use new SparkSession interface in Spark 2.0
    val spark = SparkSession
      .builder
      .appName("SparkSQL")
      .master("local[*]")
      .getOrCreate()

    // Convert our csv file to a DataSet, using our Person case
    // class to infer the schema.
    import spark.implicits._
    val people = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("data/fakefriends.csv")
      .as[Person]

    // There are lots of other ways to make a DataFrame.
    // For example, spark.read.json("json file path")
    // or sqlContext.table("Hive table name")
    
    println("Here is our inferred schema:")
    people.printSchema()
    
    println("Let's select the name column:")
    people.select("name").show()
    
    println("Filter out anyone over 21:")
    people.filter(people("age") < 21).show()
   
    println("Group by age:")
    people.groupBy("age").count().show()
    
    println("Make everyone 10 years older:")
    people.select(people("name"), people("age") + 10).show()
    
    spark.stop()
  }
}

Example 3 : Friend’s by Age with Spark DataSets

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/** Compute the average number of friends by age in a social network. */
object FriendsByAgeDataset {

  // Create case class with schema of fakefriends.csv
  case class FakeFriends(id: Int, name: String, age: Int, friends: Long)

  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("FriendsByAge")
      .master("local[*]")
      .getOrCreate()

    // Load each line of the source data into an Dataset
    import spark.implicits._
    val ds = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("data/fakefriends.csv")
      .as[FakeFriends]

    // Select only age and numFriends columns
    val friendsByAge = ds.select("age", "friends")

    // From friendsByAge we group by "age" and then compute average
    friendsByAge.groupBy("age").avg("friends").show()

    // Sorted:
    friendsByAge.groupBy("age").avg("friends").sort("age").show()

    // Formatted more nicely:
    friendsByAge.groupBy("age").agg(round(avg("friends"), 2))
      .sort("age").show()

    // With a custom column name:
    friendsByAge.groupBy("age").agg(round(avg("friends"), 2)
      .alias("friends_avg")).sort("age").show()
  }
}

Example 4 : Word count by regex, sorting and DataSets

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/** Count up how many of each word occurs in a book, using regular expressions and sorting the final results */
object WordCountBetterSortedDataset {

  case class Book(value: String)

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("WordCount")
      .master("local[*]")
      .getOrCreate()

    // Read each line of my book into an Dataset
    import spark.implicits._
    // Implicitly infer schema (with no header, so column name is just "value") 
    // and then convert DataFrame to DataSet (provide a scehma for compile time)
    val input = spark.read.text("data/book.txt").as[Book]

    // Split using a regular expression that extracts words
    val words = input
      .select(explode(split($"value", "\\W+")).alias("word"))
      .filter($"word" =!= "")

    // Normalize everything to lowercase
    val lowercaseWords = words.select(lower($"word").alias("word"))

    // Count up the occurrences of each word
    val wordCounts = lowercaseWords.groupBy("word").count()

    // Sort by counts
    val wordCountsSorted = wordCounts.sort("count")

    // Show the results.
    wordCountsSorted.show(wordCountsSorted.count.toInt)


    // ANOTHER WAY TO DO IT (Blending RDD's and Datasets)
    val bookRDD = spark.sparkContext.textFile("data/book.txt")
    val wordsRDD = bookRDD.flatMap(x => x.split("\\W+"))
    val wordsDS = wordsRDD.toDS()

    val lowercaseWordsDS = wordsDS.select(lower($"value").alias("word"))
    val wordCountsDS = lowercaseWordsDS.groupBy("word").count()
    val wordCountsSortedDS = wordCountsDS.sort("count")
    wordCountsSortedDS.show(wordCountsSortedDS.count.toInt)

  }
}

Example 5 : Revisiting the Minimum Temperature

package com.sundogsoftware.spark

import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructType}
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/** Find the minimum temperature by weather station */
object MinTemperaturesDataset {

  case class Temperature(stationID: String, date: Int, measure_type: String, temperature: Float)

  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("MinTemperatures")
      .master("local[*]")
      .getOrCreate()

    val temperatureSchema = new StructType()
      .add("stationID", StringType, nullable = true)
      .add("date", IntegerType, nullable = true)
      .add("measure_type", StringType, nullable = true)
      .add("temperature", FloatType, nullable = true)

    // Read the file as dataset
    import spark.implicits._
    val ds = spark.read
      .schema(temperatureSchema)
      .csv("data/1800.csv")
      .as[Temperature]
    
    // Filter out all but TMIN entries
    val minTemps = ds.filter($"measure_type" === "TMIN")
    
    // Select only stationID and temperature)
    val stationTemps = minTemps.select("stationID", "temperature")
    
    // Aggregate to find minimum temperature for every station
    val minTempsByStation = stationTemps.groupBy("stationID").min("temperature")

    // Convert temperature to fahrenheit and sort the dataset
    val minTempsByStationF = minTempsByStation
      .withColumn("temperature", round($"min(temperature)" * 0.1f * (9.0f / 5.0f) + 32.0f, 2))
      .select("stationID", "temperature").sort("temperature")

    // Collect, format, and print the results
    val results = minTempsByStationF.collect()
    
    for (result <- results) {
       val station = result(0)
       val temp = result(1).asInstanceOf[Float]
       val formattedTemp = f"$temp%.2f F"
       println(s"$station minimum temperature: $formattedTemp")
    }
  }
}

Example 6 : Total Spent by Customer

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{round, sum}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}

/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomerSortedDataset {

  case class CustomerOrders(cust_id: Int, item_id: Int, amount_spent: Double)

  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder
      .appName("TotalSpentByCustomer")
      .master("local[*]")
      .getOrCreate()

    // Create schema when reading customer-orders
    val customerOrdersSchema = new StructType()
      .add("cust_id", IntegerType,nullable = true)
      .add("item_id", IntegerType,nullable = true)
      .add("amount_spent", DoubleType,nullable = true)

    // Load up the data into spark dataset
    // Use default separator (,), load schema from customerOrdersSchema and force case class to read it as dataset
    import spark.implicits._
    val customerDS = spark.read
      .schema(customerOrdersSchema)
      .csv("data/customer-orders.csv")
      .as[CustomerOrders]

    val totalByCustomer = customerDS
      .groupBy("cust_id")
      .agg(round(sum("amount_spent"), 2)
        .alias("total_spent"))

    val totalByCustomerSorted = totalByCustomer.sort("total_spent")
    
    totalByCustomerSorted.show(totalByCustomer.count.toInt)
  }
  
}

5. Advanced Examples of Spark Programs

Movielens Data - userID, movieID, rating, timestamp

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, LongType, StructType}

/** Find the movies with the most ratings. */
object PopularMoviesDataset {

  // Case class so we can get a column name for our movie ID
  final case class Movie(movieID: Int)

  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
    // Use new SparkSession interface in Spark 2.0
    val spark = SparkSession
      .builder
      .appName("PopularMovies")
      .master("local[*]")
      .getOrCreate()

    // Create schema when reading u.data
    val moviesSchema = new StructType()
      .add("userID", IntegerType, nullable = true)
      .add("movieID", IntegerType, nullable = true)
      .add("rating", IntegerType, nullable = true)
      .add("timestamp", LongType, nullable = true)

    import spark.implicits._

    // Load up movie data as dataset
    val moviesDS = spark.read
      .option("sep", "\t")
      .schema(moviesSchema)
      .csv("data/ml-100k/u.data")
      .as[Movie]
    
    // Some SQL-style magic to sort all movies by popularity in one line!
    val topMovieIDs = moviesDS.groupBy("movieID").count().orderBy(desc("count"))

    // Grab the top 10
    topMovieIDs.show(10)

    // Stop the session
    spark.stop()
  }
  
}

Example 2 : Use Broadcast Variables & Spark UDF to Display Movie Names

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, LongType, StructType}

import scala.io.{Codec, Source}

/** Find the movies with the most ratings. */
object PopularMoviesNicerDataset {

  case class Movies(userID: Int, movieID: Int, rating: Int, timestamp: Long)

  /** Load up a Map of movie IDs to movie names. */
  def loadMovieNames() : Map[Int, String] = {

    // Handle character encoding issues:
    implicit val codec: Codec = Codec("ISO-8859-1") // This is the current encoding of u.item, not UTF-8.

    // Create a Map of Ints to Strings, and populate it from u.item.
    var movieNames:Map[Int, String] = Map()

    val lines = Source.fromFile("data/ml-100k/u.item")
    for (line <- lines.getLines()) {
      val fields = line.split('|')
      if (fields.length > 1) {
        movieNames += (fields(0).toInt -> fields(1))
      }
    }
    lines.close()

    movieNames
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("PopularMoviesNicer")
      .master("local[*]")
      .getOrCreate()

    val nameDict = spark.sparkContext.broadcast(loadMovieNames())

    // Create schema when reading u.data
    val moviesSchema = new StructType()
      .add("userID", IntegerType, nullable = true)
      .add("movieID", IntegerType, nullable = true)
      .add("rating", IntegerType, nullable = true)
      .add("timestamp", LongType, nullable = true)

    // Load up movie data as dataset
    import spark.implicits._
    val movies = spark.read
      .option("sep", "\t")
      .schema(moviesSchema)
      .csv("data/ml-100k/u.data")
      .as[Movies]

    // Get number of reviews per movieID
    val movieCounts = movies.groupBy("movieID").count()

    // Create a user-defined function to look up movie names from our
    // shared map variable.

    // We start by declaring an "anonymous function" in Scala
    val lookupName : Int => String = (movieID:Int)=>{
      nameDict.value(movieID)
    }

    // Then wrap it with a udf
    val lookupNameUDF = udf(lookupName)

    // Add a movieTitle column using our new udf
    val moviesWithNames = movieCounts.withColumn(colName="movieTitle", lookupNameUDF(col(colName="movieID")))

    // Sort the results
    val sortedMoviesWithNames = moviesWithNames.sort(sortCol="count")

    // Show the results without truncating it
    sortedMoviesWithNames.show(sortedMoviesWithNames.count.toInt, truncate = false)
  }
}
+-------+-----+---------------------------------------------------------------------------------+
|movieID|count|movieTitle                                                                       |
+-------+-----+---------------------------------------------------------------------------------+
|121    |429  |Independence Day (ID4) (1996)                                                    |
|300    |431  |Air Force One (1997)                                                             |
|1      |452  |Toy Story (1995)                                                                 |
|288    |478  |Scream (1996)                                                                    |
|286    |481  |English Patient, The (1996)                                                      |
|294    |485  |Liar Liar (1997)                                                                 |
|181    |507  |Return of the Jedi (1983)                                                        |
|100    |508  |Fargo (1996)                                                                     |
|258    |509  |Contact (1997)                                                                   |
|50     |583  |Star Wars (1977)                                                                 |
+-------+-----+---------------------------------------------------------------------------------+

with DataSet

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

/** Find the superhero with the most co-appearances. */
object MostPopularSuperheroDataset {

  case class SuperHeroNames(id: Int, name: String)
  case class SuperHero(value: String)
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("MostPopularSuperhero")
      .master("local[*]")
      .getOrCreate()

    // Create schema when reading Marvel-names.txt
    val superHeroNamesSchema = new StructType()
      .add("id", IntegerType, nullable = true)
      .add("name", StringType, nullable = true)

    // Build up a hero ID -> name Dataset
    import spark.implicits._
    val names = spark.read
      .schema(superHeroNamesSchema)
      .option("sep", " ")
      .csv("data/Marvel-names.txt")
      .as[SuperHeroNames]

    val lines = spark.read
      .text("data/Marvel-graph.txt")
      .as[SuperHero]

    val connections = lines
      .withColumn("id", split(col("value"), " ")(0))
      .withColumn("connections", size(split(col("value"), " ")) - 1)
      .groupBy("id").agg(sum("connections").alias("connections"))

    val mostPopular = connections
        .sort($"connections".desc)
        .first()

    val mostPopularName = names
      .filter($"id" === mostPopular(0))
      .select("name")
      .first()

    println(s"${mostPopularName(0)} is the most popular superhero with ${mostPopular(1)} co-appearances.")
  }
}

CAPTAIN AMERICA is the most popular superhero with 1937 co-appearances.

with RDD

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._

/** Find the superhero with the most co-appearances. */
object MostPopularSuperhero {
  
  // Function to extract the hero ID and number of connections from each line
  def countCoOccurrences(line: String): (Int, Int) = {
    val elements = line.split("\\s+")
    ( elements(0).toInt, elements.length - 1 )
  }
  
  // Function to extract hero ID -> hero name tuples (or None in case of failure)
  def parseNames(line: String) : Option[(Int, String)] = {
    val fields = line.split('\"')
    if (fields.length > 1) {
      Some(fields(0).trim().toInt, fields(1))
    } else None // flatmap will just discard None results, and extract data from Some results.
  }
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MostPopularSuperhero")   
    
    // Build up a hero ID -> name RDD
    val names = sc.textFile("data/marvel-names.txt")
    val namesRdd = names.flatMap(parseNames)
    
    // Load up the superhero co-appearance data
    val lines = sc.textFile("data/marvel-graph.txt")
    
    // Convert to (heroID, number of connections) RDD
    val pairings = lines.map(countCoOccurrences)
    
    // Combine entries that span more than one line
    val totalFriendsByCharacter = pairings.reduceByKey( (x,y) => x + y )
    
    // Flip it to # of connections, hero ID
    val flipped = totalFriendsByCharacter.map( x => (x._2, x._1) )
    
    // Find the max # of connections
    val mostPopular = flipped.max()
    
    // Look up the name (lookup returns an array of results, so we need to access the first result with (0)).
    val mostPopularName = namesRdd.lookup(mostPopular._2).head
    
    // Print out our answer!
    println(s"$mostPopularName is the most popular superhero with ${mostPopular._1} co-appearances.") 
  }
  
}

Example 5 : Most Obscure Superhero in a Social Graph

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

/** Find the superhero with the most co-appearances. */
object MostPopularSuperheroDataset {

  case class SuperHeroNames(id: Int, name: String)
  case class SuperHero(value: String)
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("MostPopularSuperhero")
      .master("local[*]")
      .getOrCreate()

    // Create schema when reading Marvel-names.txt
    val superHeroNamesSchema = new StructType()
      .add("id", IntegerType, nullable = true)
      .add("name", StringType, nullable = true)

    // Build up a hero ID -> name Dataset
    import spark.implicits._
    val names = spark.read
      .schema(superHeroNamesSchema)
      .option("sep", " ")
      .csv("data/Marvel-names.txt")
      .as[SuperHeroNames]

    val lines = spark.read
      .text("data/Marvel-graph.txt")
      .as[SuperHero]

    val connections = lines
      .withColumn("id", split(col("value"), " ")(0))
      .withColumn("connections", size(split(col("value"), " ")) - 1)
      .groupBy("id").agg(sum("connections").alias("connections"))

    val minConnectionCount = connections.agg(min("connections")).first().getLong()

    val minConnections = connections.filter($"connections" === minConnectionCount)
    
    val minConnectionsWithNames = minConnections.join(names, usingColumn = "id")

    println("The following characters have only " + minConnectionCount + " connection(s):")
    
    minConnectionsWithNames.select("name").show()
  }
}

with RDD

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer

/** Finds the degrees of separation between two Marvel comic book characters, based
 *  on co-appearances in a comic.
 */
object DegreesOfSeparation {
  
  // The characters we want to find the separation between.
  val startCharacterID = 5306 //SpiderMan
  val targetCharacterID = 14 //ADAM 3,031 (who?)
  
  // We make our accumulator a "global" Option so we can reference it in a mapper later.
  var hitCounter:Option[LongAccumulator] = None
  
  // Some custom data types 
  // BFSData contains an array of hero ID connections, the distance, and color.
  type BFSData = (Array[Int], Int, String)
  // A BFSNode has a heroID and the BFSData associated with it.
  type BFSNode = (Int, BFSData)
    
  /** Converts a line of raw input into a BFSNode */
  def convertToBFS(line: String): BFSNode = {
    
    // Split up the line into fields
    val fields = line.split("\\s+")
    
    // Extract this hero ID from the first field
    val heroID = fields(0).toInt
    
    // Extract subsequent hero ID's into the connections array
    var connections: ArrayBuffer[Int] = ArrayBuffer()
    for ( connection <- 1 until (fields.length - 1)) {
      connections += fields(connection).toInt
    }
    
    // Default distance and color is 9999 and white
    var color:String = "WHITE"
    var distance:Int = 9999
    
    // Unless this is the character we're starting from
    if (heroID == startCharacterID) {
      color = "GRAY"
      distance = 0
    }
    
    (heroID, (connections.toArray, distance, color))
  }
  
  /** Create "iteration 0" of our RDD of BFSNodes */
  def createStartingRdd(sc:SparkContext): RDD[BFSNode] = {
    val inputFile = sc.textFile("data/marvel-graph.txt")
    inputFile.map(convertToBFS)
  }
  
  /** Expands a BFSNode into this node and its children */
  def bfsMap(node:BFSNode): Array[BFSNode] = {
    
    // Extract data from the BFSNode
    val characterID:Int = node._1
    val data:BFSData = node._2
    
    val connections:Array[Int] = data._1
    val distance:Int = data._2
    var color:String = data._3
    
    // This is called from flatMap, so we return an array
    // of potentially many BFSNodes to add to our new RDD
    var results:ArrayBuffer[BFSNode] = ArrayBuffer()
    
    // Gray nodes are flagged for expansion, and create new
    // gray nodes for each connection
    if (color == "GRAY") {
      for (connection <- connections) {
        val newCharacterID = connection
        val newDistance = distance + 1
        val newColor = "GRAY"
        
        // Have we stumbled across the character we're looking for?
        // If so increment our accumulator so the driver script knows.
        if (targetCharacterID == connection) {
          if (hitCounter.isDefined) {
            hitCounter.get.add(1)
          }
        }
        
        // Create our new Gray node for this connection and add it to the results
        val newEntry:BFSNode = (newCharacterID, (Array(), newDistance, newColor))
        results += newEntry
      }
      
      // Color this node as black, indicating it has been processed already.
      color = "BLACK"
    }
    
    // Add the original node back in, so its connections can get merged with 
    // the gray nodes in the reducer.
    val thisEntry:BFSNode = (characterID, (connections, distance, color))
    results += thisEntry
    
    results.toArray
  }
  
  /** Combine nodes for the same heroID, preserving the shortest length and darkest color. */
  def bfsReduce(data1:BFSData, data2:BFSData): BFSData = {
    
    // Extract data that we are combining
    val edges1:Array[Int] = data1._1
    val edges2:Array[Int] = data2._1
    val distance1:Int = data1._2
    val distance2:Int = data2._2
    val color1:String = data1._3
    val color2:String = data2._3
    
    // Default node values
    var distance:Int = 9999
    var color:String = "WHITE"
    var edges:ArrayBuffer[Int] = ArrayBuffer()
    
    // See if one is the original node with its connections.
    // If so preserve them.
    if (edges1.length > 0) {
      edges ++= edges1
    }
    if (edges2.length > 0) {
      edges ++= edges2
    }
    
    // Preserve minimum distance
    if (distance1 < distance) {
      distance = distance1
    }
    if (distance2 < distance) {
      distance = distance2
    }
    
    // Preserve darkest color
    if (color1 == "WHITE" && (color2 == "GRAY" || color2 == "BLACK")) {
      color = color2
    }
    if (color1 == "GRAY" && color2 == "BLACK") {
      color = color2
    }
    if (color2 == "WHITE" && (color1 == "GRAY" || color1 == "BLACK")) {
      color = color1
    }
    if (color2 == "GRAY" && color1 == "BLACK") {
      color = color1
    }
	if (color1 == "GRAY" && color2 == "GRAY") {
	  color = color1
	}
	if (color1 == "BLACK" && color2 == "BLACK") {
	  color = color1
	}
    
    (edges.toArray, distance, color)
  }
    
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "DegreesOfSeparation") 
    
    // Our accumulator, used to signal when we find the target 
    // character in our BFS traversal.
    hitCounter = Some(sc.longAccumulator("Hit Counter"))
    
    var iterationRdd = createStartingRdd(sc)

    for (iteration <- 1 to 10) {
      println("Running BFS Iteration# " + iteration)
   
      // Create new vertices as needed to darken or reduce distances in the
      // reduce stage. If we encounter the node we're looking for as a GRAY
      // node, increment our accumulator to signal that we're done.
      val mapped = iterationRdd.flatMap(bfsMap)
      
      // Note that mapped.count() action here forces the RDD to be evaluated, and
      // that's the only reason our accumulator is actually updated.  
      println("Processing " + mapped.count() + " values.")
      
      if (hitCounter.isDefined) {
        val hitCount = hitCounter.get.value
        if (hitCount > 0) {
          println("Hit the target character! From " + hitCount + 
              " different direction(s).")
          return
        }
      }
      
      // Reducer combines data for each character ID, preserving the darkest
      // color and shortest path.      
      iterationRdd = mapped.reduceByKey(bfsReduce)
    }
  }
}

with DataSet

package com.sundogsoftware.spark

import org.apache.spark.sql.functions._
import org.apache.log4j._
import org.apache.spark.sql.{Dataset, SparkSession}

/** Finds the degrees of separation between two Marvel comic book characters, based
 *  on co-appearances in a comic.
 */
object DegreesOfSeparationDataset {

  // The characters we want to find the separation between.
  val startCharacterID = 5306 //SpiderMan
  val targetCharacterID = 14 //ADAM 3,031 (who?)

  case class SuperHero(value: String)
  case class BFSNode(id: Int, connections: Array[Int], distance: Int, color: String)

  /** Create "iteration 0" of our RDD of BFSNodes */
  def createStartingDs(spark:SparkSession): Dataset[BFSNode] = {
    import spark.implicits._
    val inputFile = spark.read
      .text("data/Marvel-graph.txt")
      .as[SuperHero]

    // Parse the data such as first element will be in column id and all the rest will be in second column as Array
    val connections = inputFile
      .withColumn("id", split(col("value"), " ")(0).cast("int"))
      .withColumn("connections", slice(split(col("value"), " "), 2, 9999).cast("array<int>"))
      .select("id", "connections")

    // Add distance and color columns
    val result = connections
      .withColumn("distance",
        when(col("id") === startCharacterID,0)
          .when(col("id") =!= startCharacterID,9999))
      .withColumn("color",
        when(col("id") === startCharacterID,"GRAY")
          .when(col("id") =!= startCharacterID,"WHITE")).as[BFSNode]

    result
  }

  def exploreNode(spark:SparkSession, ds: Dataset[BFSNode], iteration: Int): (Dataset[BFSNode], Long) = {
    import spark.implicits._
    // Get all node which needs to be explored
    val rawExploreDS = ds
      .filter($"color" === "GRAY")
      .select($"id", explode($"connections").alias("child")).distinct()

    val hitCount = rawExploreDS.filter($"child" === targetCharacterID).count()
    val exploreDS = rawExploreDS.distinct().select("child")

    // All parent become explored after getting exploreDS so we marked these as "BLACK"
    val exploring = ds
      .withColumn("color",
        when(col("color") === "GRAY","BLACK")
          .otherwise($"color")).as[BFSNode]

    // Mark all explored nodes on this iteration which were not previously explored and set distance
    val result = exploring
      .join(exploreDS, exploring("color") === "WHITE" && exploring("id") === exploreDS("child"), "leftouter")
      .withColumn("distance",
        when(col("child").isNotNull, iteration)
          .otherwise($"distance"))
      .withColumn("color",
        when(col("color") === "WHITE" && col("child").isNotNull, "GRAY")
          .otherwise($"color"))
      .select("id", "connections", "distance", "color").as[BFSNode]

    (result, hitCount)
  }
  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("DegreesOfSeparation")
      .master("local[*]")
      .getOrCreate()

    // character in our BFS traversal.
    var hitCount: Long = 0

    // Build dataset
    var iterationDs = createStartingDs(spark)

    for (iteration <- 1 to 10) {
      println("Running BFS Iteration# " + iteration)
      val resultExplore = exploreNode(spark, iterationDs, iteration)
      iterationDs = resultExplore._1
      hitCount += resultExplore._2

      if (hitCount > 0) {
        println("Hit the target character! From " + hitCount +
          " different direction(s).")
        return
      }
    }
  }
}

Example 7 : Item-Based Collaborative Filtering

Finding similar movies using Spark and the MovieLens data set

Caching or Persisting DataSets (or RDD / DataFrame)

  • Any time you will perform more than one action on an dataset, you should cache it!
    • Otherwise, Spark migh re-evaluate the entire dataset all over again!
  • Use .cache() or .persist() to do this.
    • Persist() optionally lets you cache it to disk instead of just memory, just in case a node fails or something.
  • cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action
  • cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster’s workers
  • cache() and persist() functions are used to cache intermediate results of a RDD or DataFrame or Dataset. You can mark an RDD, DataFrame or Dataset to be persisted using the persist() or cache() methods on it.

with DataSet

package com.sundogsoftware.spark

import org.apache.spark.sql.functions._
import org.apache.log4j._

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}


object MovieSimilaritiesDataset {

  case class Movies(userID: Int, movieID: Int, rating: Int, timestamp: Long)
  case class MoviesNames(movieID: Int, movieTitle: String)
  case class MoviePairs(movie1: Int, movie2: Int, rating1: Int, rating2: Int)
  case class MoviePairsSimilarity(movie1: Int, movie2: Int, score: Double, numPairs: Long)

  def computeCosineSimilarity(spark: SparkSession, data: Dataset[MoviePairs]): Dataset[MoviePairsSimilarity] = {
    // Compute xx, xy and yy columns
    val pairScores = data
      .withColumn("xx", col("rating1") * col("rating1"))
      .withColumn("yy", col("rating2") * col("rating2"))
      .withColumn("xy", col("rating1") * col("rating2"))

    // Compute numerator, denominator and numPairs columns
    val calculateSimilarity = pairScores
      .groupBy("movie1", "movie2")
      .agg(
        sum(col("xy")).alias("numerator"),
        (sqrt(sum(col("xx"))) * sqrt(sum(col("yy")))).alias("denominator"),
        count(col("xy")).alias("numPairs")
      )

    // Calculate score and select only needed columns (movie1, movie2, score, numPairs)
    import spark.implicits._
    val result = calculateSimilarity
      .withColumn("score",
        when(col("denominator") =!= 0, col("numerator") / col("denominator"))
          .otherwise(null)
      ).select("movie1", "movie2", "score", "numPairs").as[MoviePairsSimilarity]

    result
  }

  /** Get movie name by given movie id */
  def getMovieName(movieNames: Dataset[MoviesNames], movieId: Int): String = {
    val result = movieNames.filter(col("movieID") === movieId)
      .select("movieTitle").collect()(0)

    result(0).toString
  }
  /** Our main function where the action happens */
  def main(args: Array[String]) {
    
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkSession using every core of the local machine
    val spark = SparkSession
      .builder
      .appName("MovieSimilarities")
      .master("local[*]")
      .getOrCreate()

    // Create schema when reading u.item
    val moviesNamesSchema = new StructType()
      .add("movieID", IntegerType, nullable = true)
      .add("movieTitle", StringType, nullable = true)

    // Create schema when reading u.data
    val moviesSchema = new StructType()
      .add("userID", IntegerType, nullable = true)
      .add("movieID", IntegerType, nullable = true)
      .add("rating", IntegerType, nullable = true)
      .add("timestamp", LongType, nullable = true)

    println("\nLoading movie names...")
    import spark.implicits._
    // Create a broadcast dataset of movieID and movieTitle.
    // Apply ISO-885901 charset
    val movieNames = spark.read
      .option("sep", "|")
      .option("charset", "ISO-8859-1")
      .schema(moviesNamesSchema)
      .csv("data/ml-100k/u.item")
      .as[MoviesNames]

    // Load up movie data as dataset
    val movies = spark.read
      .option("sep", "\t")
      .schema(moviesSchema)
      .csv("data/ml-100k/u.data")
      .as[Movies]

    val ratings = movies.select("userId", "movieId", "rating")

    // Emit every movie rated together by the same user.
    // Self-join to find every combination.
    // Select movie pairs and rating pairs
    val moviePairs = ratings.as("ratings1")
      .join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
      .select($"ratings1.movieId".alias("movie1"),
        $"ratings2.movieId".alias("movie2"),
        $"ratings1.rating".alias("rating1"),
        $"ratings2.rating".alias("rating2")
      ).as[MoviePairs]

    val moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()

    if (args.length > 0) {
      val scoreThreshold = 0.97
      val coOccurrenceThreshold = 50.0

      val movieID: Int = args(0).toInt

      // Filter for movies with this sim that are "good" as defined by
      // our quality thresholds above
      val filteredResults = moviePairSimilarities.filter(
        (col("movie1") === movieID || col("movie2") === movieID) &&
          col("score") > scoreThreshold && col("numPairs") > coOccurrenceThreshold)

      // Sort by quality score.
      val results = filteredResults.sort(col("score").desc).take(10)

      println("\nTop 10 similar movies for " + getMovieName(movieNames, movieID))
      for (result <- results) {
        // Display the similarity result that isn't the movie we're looking at
        var similarMovieID = result.movie1
        if (similarMovieID == movieID) {
          similarMovieID = result.movie2
        }
        println(getMovieName(movieNames, similarMovieID) + "\tscore: " + result.score + "\tstrength: " + result.numPairs)
      }
    }
  }
}

Improve the results! Some ideas to try:

  • Discard bad ratings - only recommend good movies
  • Try different similarity metrics (Pearson Correlation Coeeficient, Jaccard Coefficient, Conditional Probability)
  • Adjust the threshold for minimum co-raters or minimum score
  • Invent a new similarity metric that takes the number of co-raters into account
  • Use genre info in u.items to boost scores from movies in the same genre

6. Running Spark on a Cluster

6.1 Using spark-submit to run Spark driver scripts

  • spark-submit’s job is to read a jar file which contains your compiled spark application and distribute it to the whole cluster

6.1.1 Using EMR, tuning performance on a cluster

6.1.2 Packaging and deploying your application


6.2 What is SBT (Scala Build Tool)?

  • Like Maven for Scala
  • Manages your library dependency tree for you
  • Can package up all of your dependencies into a self-contained JAR
  • If you have many dependencies, it makes life a lot easier than passing a ton of -jars option
  • Get it from scala-sbt.org

6.3 Cluster Manager

  • Spark’s built-in cluster manager
  • Hadoop’s Yarn cluster manager

6.4 Spark-Submit Parameters

  • –master (yarn)
  • –num-executors
  • –executor-memory
  • –total-executor-cores

6.5 .repartition(numPartitions=100) on DataFrame or .partitionBy() on RDD

  • Atleast as many as number of executors

Spark Optimization

  • Aim for fewer stages and less data re-shuffling

7. Machine Learning with Spark ML (uses DataFrame)

  • Resources
  • Advanced Analytics with Spark
  • Examples in Spark SDK
  • General ML courses

7.1 ML Capabilities

  • Feature extraction
    • Term Frequency / Inverse Document Frequency useful for search
  • Basic statistics
    • Chi-squared test, Pearson or Spearman correlation, min, max, mean, variance
  • Linear regression, logistic regression
  • Support Vector Machines
  • Naive Bayes classifier
  • Decision trees
  • K-Means clustering
  • Principal component analysis, singular value decomposition
  • Recommendations using Alternating Least Squares

7.2 Movie recommendations using ALS from Spark’s ML library

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.ml.recommendation._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}

import scala.collection.mutable

object MovieRecommendationsALSDataset {

  case class MoviesNames(movieId: Int, movieTitle: String)
  // Row format to feed into ALS
  case class Rating(userID: Int, movieID: Int, rating: Float)

  // Get movie name by given dataset and id
  def getMovieName(movieNames: Array[MoviesNames], movieId: Int): String = {
    val result = movieNames.filter(_.movieId == movieId)(0)

    result.movieTitle
  }
  /** Our main function where the action happens */
  def main(args: Array[String]) {
    
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
    // Make a session
    val spark = SparkSession
      .builder
      .appName("ALSExample")
      .master("local[*]")
      .getOrCreate()

    
    println("Loading movie names...")
    // Create schema when reading u.item
    val moviesNamesSchema = new StructType()
      .add("movieID", IntegerType, nullable = true)
      .add("movieTitle", StringType, nullable = true)

    // Create schema when reading u.data
    val moviesSchema = new StructType()
      .add("userID", IntegerType, nullable = true)
      .add("movieID", IntegerType, nullable = true)
      .add("rating", IntegerType, nullable = true)
      .add("timestamp", LongType, nullable = true)

    import spark.implicits._
    // Create a broadcast dataset of movieID and movieTitle.
    // Apply ISO-885901 charset
    val names = spark.read
      .option("sep", "|")
      .option("charset", "ISO-8859-1")
      .schema(moviesNamesSchema)
      .csv("data/ml-100k/u.item")
      .as[MoviesNames]

    val namesList = names.collect()

    // Load up movie data as dataset
    val ratings = spark.read
      .option("sep", "\t")
      .schema(moviesSchema)
      .csv("data/ml-100k/u.data")
      .as[Rating]
    
    // Build the recommendation model using Alternating Least Squares
    println("\nTraining recommendation model...")
    
    val als = new ALS()
      .setMaxIter(5)
      .setRegParam(0.01)
      .setUserCol("userID")
      .setItemCol("movieID")
      .setRatingCol("rating")
    
    val model = als.fit(ratings)
      
    // Get top-10 recommendations for the user we specified
    val userID:Int = args(0).toInt
    val users = Seq(userID).toDF("userID")
    val recommendations = model.recommendForUserSubset(users, 10)
    
    // Display them (oddly, this is the hardest part!)
    println("\nTop 10 recommendations for user ID " + userID + ":")

    for (userRecs <- recommendations) {
      val myRecs = userRecs(1) // First column is userID, second is the recs
      val temp = myRecs.asInstanceOf[mutable.WrappedArray[Row]] // Tell Scala what it is
      for (rec <- temp) {
        val movie = rec.getAs[Int](0)
        val rating = rec.getAs[Float](1)
        val movieName = getMovieName(namesList, movie)
        println(movieName, rating)
      }
    }
    
    // Stop the session
    spark.stop()

  }
}

7.3 Linear Regression with Spark.ML

package com.sundogsoftware.spark

import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.types._

object LinearRegressionDataFrameDataset {
  
  case class RegressionSchema(label: Double, features_raw: Double)

  /** Our main function where the action happens */
  def main(args: Array[String]) {
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder
      .appName("LinearRegressionDF")
      .master("local[*]")
      .getOrCreate()
      
    // Load up our page speed / amount spent data in the format required by MLLib
    // (which is label, vector of features)
      
    // In machine learning lingo, "label" is just the value you're trying to predict, and
    // "feature" is the data you are given to make a prediction with. So in this example
    // the "labels" are the first column of our data, and "features" are the second column.
    // You can have more than one "feature" which is why a vector is required.
    val regressionSchema = new StructType()
      .add("label", DoubleType, nullable = true)
      .add("features_raw", DoubleType, nullable = true)

    import spark.implicits._
    val dsRaw = spark.read
      .option("sep", ",")
      .schema(regressionSchema)
      .csv("data/regression.txt")
      .as[RegressionSchema]

    val assembler = new VectorAssembler().
      setInputCols(Array("features_raw")).
      setOutputCol("features")
    val df = assembler.transform(dsRaw)
        .select("label","features")
     
    // Let's split our data into training data and testing data
    val trainTest = df.randomSplit(Array(0.5, 0.5))
    val trainingDF = trainTest(0)
    val testDF = trainTest(1)
    
    // Now create our linear regression model
    val lir = new LinearRegression()
      .setRegParam(0.3) // regularization 
      .setElasticNetParam(0.8) // elastic net mixing
      .setMaxIter(100) // max iterations
      .setTol(1E-6) // convergence tolerance
    
    // Train the model using our training data
    val model = lir.fit(trainingDF)
    
    // Now see if we can predict values in our test data.
    // Generate predictions using our linear regression model for all features in our 
    // test dataframe:
    val fullPredictions = model.transform(testDF).cache()
    
    // This basically adds a "prediction" column to our testDF dataframe.
    
    // Extract the predictions and the "known" correct labels.
    val predictionAndLabel = fullPredictions.select("prediction", "label").collect()
    
    // Print out the predicted and actual values for each point
    for (prediction <- predictionAndLabel) {
      println(prediction)
    }
    
    // Stop the session
    spark.stop()

  }
}

7.4 Predict Real Estate with Decision Trees in Spark.ML


8. Intro to Spark Streaming

  • Streaming sources of data (unline batch data)
  • Example : Log data and real time analysis; Set-up alarm
  • Source : Can take data fed to some port; Amazon Kinesis, HDFS, Kafka, Flume and others
  • Checkpointing : stores state to disk periodically for fault tolerance

8.1 Old API : The DStream API for Spark Streaming

  • Created micro batches (of say 1 seconds worth of data) and process it as RDD
  • window(), reduceByWindow(), reduceByKeyAndWindow()
val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow((x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))

(#BoatyMcBoatFace, 2)
(#WhatIceberg, 1)

Example 1 : Twitter Streaming API and Spark DStream

package com.sundogsoftware.spark

import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._

/** Listens to a stream of Tweets and keeps track of the most popular
 *  hashtags over a 5 minute window.
 */
object PopularHashtags {
  
    /** Makes sure only ERROR messages get logged to avoid log spam. */
  def setupLogging(): Unit = {
    import org.apache.log4j.{Level, Logger}   
    val rootLogger = Logger.getRootLogger
    rootLogger.setLevel(Level.ERROR)   
  }
  
  /** Configures Twitter service credentials using twitter.txt in the main workspace directory */
  def setupTwitter(): Unit = {
    import scala.io.Source

    val lines = Source.fromFile("data/twitter.txt")
    for (line <- lines.getLines) {
      val fields = line.split(" ")
      if (fields.length == 2) {
        System.setProperty("twitter4j.oauth." + fields(0), fields(1))
      }
    }
    lines.close()
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Configure Twitter credentials using twitter.txt
    setupTwitter()
    
    // Set up a Spark streaming context named "PopularHashtags" that runs locally using
    // all CPU cores and one-second batches of data
    val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
    
    // Get rid of log spam (should be called after the context is set up)
    setupLogging()

    // Create a DStream from Twitter using our streaming context
    val tweets = TwitterUtils.createStream(ssc, None)
    
    // Now extract the text of each status update into DStreams using map()
    val statuses = tweets.map(status => status.getText)
    
    // Blow out each word into a new DStream
    val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
    
    // Now eliminate anything that's not a hashtag
    val hashtags = tweetwords.filter(word => word.startsWith("#"))
    
    // Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
    val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
    
    // Now count them up over a 5 minute window sliding every one second
    val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
    //  You will often see this written in the following shorthand:
    //val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(300), Seconds(1))
    
    // Sort the results by the count values
    val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, ascending = false))
    
    // Print the top 10
    sortedResults.print
    
    // Set a checkpoint directory, and kick it all off
    // I could watch this all day!
    ssc.checkpoint("C:/checkpoint/")
    ssc.start()
    ssc.awaitTermination()
  }  
}

8.2 New API : The Structured Streaming API for Spark Streaming

  • Truly real-time and process it as DataSets
  • DataSet just keeps getting appended

Example 2 : Structured Streaming API and Logs Data

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

/** Find the movies with the most ratings. */
object StructuredStreaming {


  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // Streaming source that monitors the data/logs directory for text files
    val accessLines = spark.readStream.text("data/logs")

    // Regular expressions to extract pieces of Apache access log lines
    val contentSizeExp = "\\s(\\d+)$"
    val statusExp = "\\s(\\d{3})\\s"
    val generalExp = "\"(\\S+)\\s(\\S+)\\s*(\\S*)\""
    val timeExp = "\\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} -\\d{4})]"
    val hostExp = "(^\\S+\\.[\\S+\\.]+\\S+)\\s"

    // Apply these regular expressions to create structure from the unstructured text
    val logsDF = accessLines.select(regexp_extract(col("value"), hostExp, 1).alias("host"),
    regexp_extract(col("value"), timeExp, 1).alias("timestamp"),
    regexp_extract(col("value"), generalExp, 1).alias("method"),
    regexp_extract(col("value"), generalExp, 2).alias("endpoint"),
    regexp_extract(col("value"), generalExp, 3).alias("protocol"),
    regexp_extract(col("value"), statusExp, 1).cast("Integer").alias("status"),
    regexp_extract(col("value"), contentSizeExp, 1).cast("Integer").alias("content_size"))

    // Keep a running count of status codes
    val statusCountsDF = logsDF.groupBy("status").count()

    // Display the stream to the console
    val query = statusCountsDF.writeStream.outputMode("complete").format("console").queryName("counts").start()

    // Wait until we terminate the scripts
    query.awaitTermination()

    // Stop the session
    spark.stop()
  }
  
}

Example 3 : Windows Operations with Structured Streaming

package com.sundogsoftware.spark

import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

/** Find the movies with the most ratings. */
object TopURLs {


  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // Streaming source that monitors the data/logs directory for text files
    val accessLines = spark.readStream.text("data/logs")

    // Regular expressions to extract pieces of Apache access log lines
    val contentSizeExp = "\\s(\\d+)$"
    val statusExp = "\\s(\\d{3})\\s"
    val generalExp = "\"(\\S+)\\s(\\S+)\\s*(\\S*)\""
    val timeExp = "\\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} -\\d{4})]"
    val hostExp = "(^\\S+\\.[\\S+\\.]+\\S+)\\s"

    // Apply these regular expressions to create structure from the unstructured text
    val logsDF = accessLines.select(regexp_extract(col("value"), hostExp, 1).alias("host"),
    regexp_extract(col("value"), timeExp, 1).alias("timestamp"),
    regexp_extract(col("value"), generalExp, 1).alias("method"),
    regexp_extract(col("value"), generalExp, 2).alias("endpoint"),
    regexp_extract(col("value"), generalExp, 3).alias("protocol"),
    regexp_extract(col("value"), statusExp, 1).cast("Integer").alias("status"),
    regexp_extract(col("value"), contentSizeExp, 1).cast("Integer").alias("content_size"))

    val logsDF2 = logsDF.withColumn("eventTime", current_timestamp())

    // Keep a running count of endpoints
    val endpointCounts = logsDF2.groupBy(window(col("eventTime"),
      "30 seconds", "10 seconds"), col("endpoint")).count()

    val sortedEndpointCounts = endpointCounts.orderBy(col("count").desc)

    // Display the stream to the console
    val query = sortedEndpointCounts.writeStream.outputMode("complete").format("console")
      .queryName("counts").start()

    // Wait until we terminate the scripts
    query.awaitTermination()

    // Stop the session
    spark.stop()
  }
  
}
  • Introduces VertexRDD and EdgeRDD, and the Edge data type
  • Otherwise, GraphX code looks like any other Spark code for the most part

Example Uses:

  • It can measure things like “connectedness”, degree of distribution, average path length, triangle counts - high level measures of a graph
  • It can count triangles in the graph, and apply the PageRank algorithm
  • It can also join graphs together and transform graphs quickly
  • For things like our “degrees of separation” example, there is no pre built support but does support Pregel API for traversing a graph
package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.log4j._
import org.apache.spark.graphx._

/** Some examples of GraphX in action with the Marvel superhero dataset! */
object GraphX {
  
  // Function to extract hero ID -> hero name tuples (or None in case of failure)
  def parseNames(line: String) : Option[(VertexId, String)] = {
    val fields = line.split('\"')
    if (fields.length > 1) {
      val heroID:Long = fields(0).trim().toLong
      if (heroID < 6487) {  // ID's above 6486 aren't real characters
        return Some( fields(0).trim().toLong, fields(1))
      }
    } 
  
    None // flatmap will just discard None results, and extract data from Some results.
  }
  
  /** Transform an input line from marvel-graph.txt into a List of Edges */
  def makeEdges(line: String) : List[Edge[Int]] = {
    import scala.collection.mutable.ListBuffer
    var edges = new ListBuffer[Edge[Int]]()
    val fields = line.split(" ")
    val origin = fields(0)
    for (x <- 1 until (fields.length - 1)) {
      // Our attribute field is unused, but in other graphs could
      // be used to deep track of physical distances etc.
      edges += Edge(origin.toLong, fields(x).toLong, 0)
    }
    
    edges.toList
  }
  
  /** Our main function where the action happens */
  def main(args: Array[String]) {
    
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    
     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "GraphX")
    
     // Build up our vertices
    val names = sc.textFile("data/marvel-names.txt")
    val verts = names.flatMap(parseNames)
    
    // Build up our edges
    val lines = sc.textFile("data/marvel-graph.txt")
    val edges = lines.flatMap(makeEdges)    
    
    // Build up our graph, and cache it as we're going to do a bunch of stuff with it.
    val default = "Nobody"
    val graph = Graph(verts, edges, default).cache()
    
    // Find the top 10 most-connected superheroes, using graph.degrees:
    println("\nTop 10 most-connected superheroes:")
    // The join merges the hero names into the output; sorts by total connections on each node.
    graph.degrees.join(verts).sortBy(_._2._1, ascending=false).take(10).foreach(println)


    // Now let's do Breadth-First Search using the Pregel API
    println("\nComputing degrees of separation from SpiderMan...")
    
    // Start from SpiderMan
    val root: VertexId = 5306 // SpiderMan
    
    // Initialize each node with a distance of infinity, unless it's our starting point
    val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)

    // Now the Pregel magic
    val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)( 
        // Our "vertex program" preserves the shortest distance
        // between an inbound message and its current value.
        // It receives the vertex ID we are operating on,
        // the attribute already stored with the vertex, and
        // the inbound message from this iteration.
        (id, attr, msg) => math.min(attr, msg),
        
        // Our "send message" function propagates out to all neighbors
        // with the distance incremented by one.
        triplet => { 
          if (triplet.srcAttr != Double.PositiveInfinity) { 
            Iterator((triplet.dstId, triplet.srcAttr+1)) 
          } else { 
            Iterator.empty 
          } 
        }, 
        
        // The "reduce" operation preserves the minimum
        // of messages received by a vertex if multiple
        // messages are received by one vertex
        (a,b) => math.min(a,b) ).cache()
    
    // Print out the first 100 results:
    bfs.vertices.join(verts).take(100).foreach(println)
    
    // Recreate our "degrees of separation" result:
    println("\n\nDegrees from SpiderMan to ADAM 3,031")  // ADAM 3031 is hero ID 14
    bfs.vertices.filter(x => x._1 == 14).collect.foreach(println)
    
  }
}

10. More on Spark

  • Learning Spark by O’Reily
  • Learning Scala by O’Reily
  • Advanced Analytics with Spark by O’Reily
  • https://spark.apache.org/

Leave a Comment