- Advanced Spark Programming
- Apache Spark - Deployment
- Apache Spark - Core Programming
- Apache Spark - Installation
- Apache Spark - RDD
- Apache Spark - Introduction
- Apache Spark - Home
Apache Spark Useful Resources
Selected Reading
- Who is Who
- Computer Glossary
- HR Interview Questions
- Effective Resume Writing
- Questions and Answers
- UPSC IAS Exams Notes
Apache Spark - Deployment
Spark apppcation, using spark-submit, is a shell command used to deploy the Spark apppcation on a cluster. It uses all respective cluster managers through a uniform interface. Therefore, you do not have to configure your apppcation for each one.
Example
Let us take the same example of word count, we used before, using shell commands. Here, we consider the same example as a spark apppcation.
Sample Input
The following text is the input data and the file named is in.txt.
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
Look at the following program −
SparkWordCount.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = apppcation name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ val count = input.flatMap(pne ⇒ pne.sppt(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }
Save the above program into a file named SparkWordCount.scala and place it in a user-defined directory named spark-apppcation.
Note − While transforming the inputRDD into countRDD, we are using flatMap() for tokenizing the pnes (from text file) into words, map() method for counting the word frequency and reduceByKey() method for counting each word repetition.
Use the following steps to submit this apppcation. Execute all steps in the spark-apppcation directory through the terminal.
Step 1: Download Spark Ja
Spark core jar is required for compilation, therefore, download spark-core_2.10-1.3.0.jar from the following pnk
and move the jar file from download directory to spark-apppcation directory.Step 2: Compile program
Compile the above program using the command given below. This command should be executed from the spark-apppcation directory. Here, /usr/local/spark/pb/spark-assembly-1.4.0-hadoop2.6.0.jar is a Hadoop support jar taken from Spark pbrary.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/pb/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Step 3: Create a JAR
Create a jar file of the spark apppcation using the following command. Here, wordcount is the file name for jar file.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/pb/spark-assembly-1.4.0-hadoop2.6.0.jar
Step 4: Submit spark apppcation
Submit the spark apppcation using the following command −
spark-submit --class SparkWordCount --master local wordcount.jar
If it is executed successfully, then you will find the output given below. The OK letting in the following output is for user identification and that is the last pne of the program. If you carefully read the following output, you will find different things, such as −
successfully started service sparkDriver on port 42954
MemoryStore started with capacity 267.3 MB
Started SparkUI at http://192.168.1.217:4040
Added JAR file:/home/hadoop/piapppcation/count.jar
ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
Stopped Spark web UI at http://192.168.1.217:4040
MemoryStore cleared
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 15/07/08 13:56:04 INFO Utils: Successfully started service sparkDriver on port 42954. 15/07/08 13:56:04 INFO Remoting: Remoting started; pstening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 15/07/08 13:56:05 INFO Utils: Successfully started service HTTP file server on port 56707. 15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapppcation/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 15/07/08 13:56:11 INFO HadoopRDD: Input sppt: file:/home/hadoop/piapppcation/in.txt:0+54 15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s OK 15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 15/07/08 13:56:14 INFO Utils: Shutdown hook called 15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Step 5: Checking output
After successful execution of the program, you will find the directory named outfile in the spark-apppcation directory.
The following commands are used for opening and checking the pst of files in the outfile directory.
$ cd outfile $ ls Part-00000 part-00001 _SUCCESS
The commands for checking output in part-00000 file are −
$ cat part-00000 (people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
The commands for checking output in part-00001 file are −
$ cat part-00001 (walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
Go through the following section to know more about the ‘spark-submit’ command.
Spark-submit Syntax
spark-submit [options] <app jar | python file> [app arguments]
Options
S.No | Option | Description |
---|---|---|
1 | --master | spark://host:port, mesos://host:port, yarn, or local. |
2 | --deploy-mode | Whether to launch the driver program locally ("cpent") or on one of the worker machines inside the cluster ("cluster") (Default: cpent). |
3 | --class | Your apppcation s main class (for Java / Scala apps). |
4 | --name | A name of your apppcation. |
5 | --jars | Comma-separated pst of local jars to include on the driver and executor classpaths. |
6 | --packages | Comma-separated pst of maven coordinates of jars to include on the driver and executor classpaths. |
7 | --repositories | Comma-separated pst of additional remote repositories to search for the maven coordinates given with --packages. |
8 | --py-files | Comma-separated pst of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps. |
9 | --files | Comma-separated pst of files to be placed in the working directory of each executor. |
10 | --conf (prop=val) | Arbitrary Spark configuration property. |
11 | --properties-file | Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults. |
12 | --driver-memory | Memory for driver (e.g. 1000M, 2G) (Default: 512M). |
13 | --driver-java-options | Extra Java options to pass to the driver. |
14 | --driver-pbrary-path | Extra pbrary path entries to pass to the driver. |
15 | --driver-class-path | Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. |
16 | --executor-memory | Memory per executor (e.g. 1000M, 2G) (Default: 1G). |
17 | --proxy-user | User to impersonate when submitting the apppcation. |
18 | --help, -h | Show this help message and exit. |
19 | --verbose, -v | Print additional debug output. |
20 | --version | Print the version of current Spark. |
21 | --driver-cores NUM | Cores for driver (Default: 1). |
22 | --supervise | If given, restarts the driver on failure. |
23 | --kill | If given, kills the driver specified. |
24 | --status | If given, requests the status of the driver specified. |
25 | --total-executor-cores | Total cores for all executors. |
26 | --executor-cores | Number of cores per executor. (Default : 1 in YARN mode, or all available cores on the worker in standalone mode). |