Spark(3)More Examples and Deployment
1. More Spark Examples
1.1 Text Search
val file = spark.textFile()
val errors = file.filter(line => line.contains("ERROR"))
//count all the errors
errors.count()
errors.filter(line => line.contains("MySQL")).count()
errors.filter(line => line.contains("MySQL")).collect()
//show all the error messages related to MYSQL
1.2 IN-Memory text search
errors.cache()
1.3 Word Count
val file = spark.textFile()
val counts = file.flatMap(line =>line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile()
2. We can find more Code Examples
https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/examples
3. My Example
val item1 = Product(None,"CK","good",DateTime.now)
val item2 = Product(None,"apple","good",DateTime.now)
val item3 = Product(None,"nike","bad",DateTime.now)
val item4 = Product(None,"cat","bad",DateTime.now)
val products = Seq(item1,item2,item3,item4)
val name = "good"
val rdd = sc.makeRDD(products,2)
val rdd2 = rdd.groupBy{ s => s.productName == name}
println("rdd first array ============" + rdd2.toArray()(0)._2)
println("rdd second array ============" + rdd2.toArray()(1)._2)
rdd2.toArray()(1)._2.foreach{ s =>
println("Products are good ============== " + s)
//persist
}
sc.stop
The output of this example will be as follow:
Products are good ============== Product(None,CK,good,2013-07-09T10:04:03.949-05:00)
Products are good ============== Product(None,apple,good,2013-07-09T10:04:04.075-05:00)
In my official example, we can also put the DAO layer in another project, spark only deal with the calculate things.
One problem we need to take care about is
Every class we need to use in the closure of spark, we need to make sure it is extends from Serializable
4. Deployment of Spark - Spark Standalone Mode
4.1 Starting a Cluster Manually
>cd SPARK_HOME
>./run spark.deploy.master.Master
We can visit the UI page with URL http://localhost:8080/
This should be the URL of spark master spark://Carls-MacBook-Pro.local:7077
I can also change the URL of spark with these parameters
>./run spark.deploy.master.Master -i localhost
My master URL will be spark://localhost:7077 then.
And I also can start one work node with this command
>./run spark.deploy.worker.Worker spark://localhost:7077
After that we can also see the URL and other information on the UI of master
Carls-MacBook-Pro.local:52875
I also want to define the HOST part of the URL with parameters.
>./run spark.deploy.worker.Worker spark://localhost:7077 -i localhost
There are a lot of other parameters
-i IP, --ip IP
-p PORT, --port PORT
--webui-port PORT Port for web UI(default: 8080 for master, 8081 for worker)
-c CORES, --cores CORES only on worker, Total CPU cores to allow Spark jobs to use
-m MEM, --memory MEM Total amount of memory to allow Spark jobs to use, e.g.: 1000M or 2G, only on worker
-d DIR, --work-dir DIR Default is SPARK_HOME/work
And there is another way to launch the Cluster
4.2 Cluster Launch Scripts
Prepare the Server to Work on
Add user on Remote Server
>/usr/sbin/useradd spark
Give a password
>passwd spark
Generate the Key Pair on Client Server
>ssh-keygen -t rsa
The pairs will be generated here /Users/carl/.ssh/
Place the public key on remote server
>su spark
>mkdir ~/.ssh
>vi authorized_keys
Place the content in public key here
>chmod 711 ~/.ssh
>chmod 644 ~/.ssh/authorized_keys
Try from the client Server
>sudo vi config
Place something like this
Host server1.com
user root
identityFile "~/.ssh/demo1"
Host server2.com
user spark
identityFile "~/.ssh/demo2"
Then I can connect to the server directly now.
The cluster scripts are as follow:
bin/start-master.sh
bin/start-slaves.sh
bin/start-all.sh
bin/stop-master.sh
bin/stop-slaves.sh
bin/stop-all.sh
Need to execute these commands on the master machines.
Watch on the file SPARK_HOME/conf/slaves
There is only localhost right now. And we can config the parameters in SPARK_HOME/conf/spark-env.sh
SCALA_HOME=/opt/scala2.10.0
SPARK_MASTER_IP=localhost
Then I can start the cluster with command
>bin/start-all.sh
Error Message:
Carls-MacBook-Pro:spark carl$ ssh localhost
ssh: connect to host localhost port 22: Connection refused
We can get the same error Message with command
>ssh localhost
Solution:
System Preferences -----> Internet & Wireless -------> Sharing -----> Check 'Remote Login'
System Preferences -----> Internet & Wireless -------> Sharing -----> click 'Edit' to change the name of my MAC book
4.3 Connecting the Job to the Cluster
>MASTER=spark://IP:PORT ./spark-shell
Connecting with App.
val sparkMaster = "spark://192.168.10.115:7070"
val sc = new SparkContext(sparkMaster,
"Complex Job",
"/opt/spark",
List("target/scala-2.10/easysparkserver_2.10-1.0.jar"),
Map())
Try to use app to connect to master
Error Message:
3:40:55 INFO client.Client$ClientActor: Connecting to master spark://192.168.10.115:7070
13/07/09 13:40:55 ERROR client.Client$ClientActor: Connection to master failed; stopping client
Solution:
Error port number, change to 7077
Better, but still get Error Message
13/07/09 13:44:54 INFO cluster.ClusterScheduler: Adding task set 1.0 with 2 tasks
13/07/09 13:45:09 WARN cluster.ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered
13/07/09 14:31:19 ERROR NettyRemoteTransport(akka://sparkMaster@localhost:7077): RemoteServerError@akka://sparkMaster@localhost:7077] Error[java.io.OptionalDataException]
Solution:
>./run spark.examples.SparkPi spark://localhost:7077
It is working, so the cluster should be fine.
Finally I found the reason. I need to start the spark server with sudo user.
Error Message:
13/07/09 14:45:03 ERROR executor.Executor: Exception in task ID 0
java.lang.NoClassDefFoundError: Lorg/joda/time/DateTime;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2339)
13/07/09 14:45:03 ERROR executor.Executor: Exception in task ID 1
java.lang.NoClassDefFoundError: Lorg/joda/time/DateTime;
at java.lang.Class.getDeclaredFields0(Native Method)
Solution:
>sbt assembly
Error Message
[info] Merging 'javax/servlet/SingleThreadModel.class' with strategy 'deduplicate'
java.lang.RuntimeException: deduplicate: different file contents found in the following:
/Users/carl/.ivy2/cache/javax.servlet/servlet-api/jars/servlet-api-2.5.jar:javax/servlet/SingleThreadModel.class
/Users/carl/.ivy2/cache/org.mortbay.jetty/servlet-api-2.5/jars/servlet-api-2.5-6.1.14.jar:javax/servlet/SingleThreadModel.class
/Users/carl/.ivy2/cache/org.mortbay.jetty/servlet-api/jars/servlet-api-2.5-20081211.jar:javax/servlet/SingleThreadModel.class
javax/servlet/SingleThreadModel
org/apache/jasper/compiler/Node
org/fusesource/jansi/Ansi$1.class' with strategy 'deduplicate'
'org/apache/commons/beanutils/converters/FloatArrayConverter
META-INF/native/osx/libjansi.jnilib' with strategy 'deduplicate'
Merging 'org/apache/commons/collections
Merging 'META-INF/NOTICE.txt' with strategy 'deduplicate'
'META-INF/native/windows32/jansi.dll' with strategy 'reduplicate'
'about.html' with strategy 'reduplicate'
Solution:
import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "jasper", xs @ _*) => MergeStrategy.first
case PathList("org", "fusesource", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "commons", "beanutils", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "commons", "collections", xs @ _*) => MergeStrategy.first
case PathList("META-INF", xs @ _*) =>
(xs map {_.toLowerCase}) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
MergeStrategy.discard
case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
MergeStrategy.discard
case "plexus" :: xs =>
MergeStrategy.discard
case "services" :: xs =>
MergeStrategy.filterDistinctLines
case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) =>
MergeStrategy.filterDistinctLines
case ps @ (x :: xs) if ps.last.endsWith(".jnilib") || ps.last.endsWith(".dll") =>
MergeStrategy.first
case ps @ (x :: xs) if ps.last.endsWith(".txt") =>
MergeStrategy.discard
case ("notice" :: Nil) | ("license" :: Nil)=>
MergeStrategy.discard
case _ => MergeStrategy.deduplicate
}
case "application.conf" => MergeStrategy.concat
case "about.html" => MergeStrategy.discard
case x => old(x)
}
}
>sbt clean update compile package assembly
>sbt run
That is it. I hope it works fine. Certainly, I need to change the SparkContext as follow:
val sc = new SparkContext(sparkMaster,
"Complex Job", "/opt/spark",
List("/Users/carl/work/easy/easysparkserver/target/easysparkserver-assembly-1.0.jar"), Map())
Alternatively
excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
cp filter {_.data.getName == "parboiled-scala_2.10.0-RC5-1.1.4.jar" }
cp filter {_.data.getName == "minlog-1.2.jar" }
}
References:
spark shell
http://spark-project.org/docs/latest/quick-start.html
Spark Grammer and Examples
http://spark-project.org/examples/
cassandra thrift
http://wiki.apache.org/cassandra/ThriftExamples
http://wiki.apache.org/cassandra/ClientExamples
API document
http://spark-project.org/docs/latest/scala-programming-guide.html
Running Configuration
http://spark-project.org/docs/latest/running-on-yarn.html
http://spark-project.org/docs/latest/running-on-mesos.html
http://spark-project.org/docs/latest/spark-standalone.html
private key with SSH
http://sillycat.iteye.com/blog/1100363
http://sillycat.iteye.com/blog/1756114
sbt assembly
https://github.com/sbt/sbt-assembly
- 浏览: 2489412 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
发表评论
-
Update Site will come soon
2021-06-02 04:10 1613I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 259Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 378Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 243Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 211Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 287AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 264Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 302Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 412Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 460Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 323Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 289Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 336Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 392Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 417MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 420RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 292Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 299Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 295ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 363Jetty Server and Cookie Domain ...
相关推荐
R and data mining examples and case studies 对应的代码和数据
JSP Examples and Best Practices
spark-examples-1.6.1-hadoop2.6.0.jar包下载,用于spark开发使用 用于spark开发使用 用于spark开发使用
Graph Algorithms: Practical Examples in Apache Spark and Neo4j By 作者: Mark Needham – Amy E. Hodler ISBN-10 书号: 1492047686 ISBN-13 书号: 9781492047681 Edition 版本: 1 出版日期: 2019-01-04 pages ...
Filled with practical examples and use cases, this book will hot only help you get up and running with Spark, but will also take you farther down the road to becoming a data scientist.
Frank Kane's Taming Big Data with Apache Spark and Python English | 2017 | ISBN-10: 1787287947 | 296 pages | AZW3/PDF/EPUB (conv) | 6.12 Mb ... - Learning More About Spark and Data Science
Derived from years of experience working with data access developers, ADO.NET Examples and Best Practices for C# Programmers includes a set of techniques proven to drastically reduce overhead, ...
源码编译制作的parcel包,亲测可用。 pyspark访问hbase2报错的解决方案,下载spark2.4.3的源码重新打包。
learning-spark-examples-master
spark_python_ml_examples, Spark 2.0 python 机器学习示例 Spark python-机器学习示例这个库是 Apache Spark 示例系列的一部分,旨在演示如何用Spark支持的不同编程语言实现机器学习解决方案的实现。 Java是唯一未...
解决Spark与Hbase版本升级后Pyspark写入,报错找不到StringToImmutableBytesWritableConverte类和找不到:org.apache.hadoop.hbase.client.Put类中的add方法的问题
Companion Software for Digitale Signalverarbeitung: Grundlagen und Anwendungen Beispiele und Ubungen mit MATLAB (Digital Signal Processing: Fundamentals and ... Examples and Exercises with MATLAB)
algorithm examples utilize the Spark and Neo4j platforms, this book will also be helpful for understanding more general graph concepts, regardless of your choice of graph technologies. The first two ...
spark-examples Anagrams、BigramAnalysis、Inverted index、vihicle 使用在这一点上是稳定的,所有其他类都是 WIP。 一些示例依赖于本地文件,我将修复这些输入路径。 为了从本地机器运行示例,请确保您的路径中...
HbaseAPI资源包,pyspark读写habase必备jar包,有余力的同学可以自己修改定义。
spark_examples Spark程序的示例测试程序
pyspark访问hbase2报错的解决方案,下载spark2.4.3的源码重新打包。
Graph Algorithms Practical Examples in Apache Spark and Neo4j 2019-04-15 月的书
jar包,官方版本,自测可用