`
sillycat
  • 浏览: 2488400 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Spark Streaming(1)Spark Streaming Concept and Zookeeper/Kafka on Local

 
阅读更多
Spark Streaming(1)Spark Streaming Concept and Zookeeper/Kafka on Local

I was using Spark for more than 1 year now, from 0.7 to 0.9 on production. Recently I came back to Spark and considering upgrade the version to 1.3.1. There are a lot of new things and good idea after 0.9.

1. Introduction
Standalone Cluster
master machine is a single point.
https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper
We have options #1 use zookeeper to manage several masters
spark-env.sh,
spark.deploy.recoveryMode, default value is NONE, should be changed to ZOOKEEPER
spark.deploy.zookeeper.url, eg, 192.168.1.100:2181, 192.168.1.102:2181
spark.deploy.zookeeper.dir, eg, /spark

For spark job, standalone cluster will have all the jars and files in the working directory, we need to set spark.worker.cleanup.appDataTtl to clean them. But YARN cluster will automatically do that.

Cluster Job Schedule
standalone cluster - FIFO, spark.cores.max and spark.deploy.defaultCores and others to set how much resource one application can use.
mesos
YARN - —num-executor, —executor-memory and etc.

Spark Streaming
source from kafka, flume, twitter, zeromq, kinesis

original DStream   time1 time2 time3 time4 time5
windowed DStream   window time1         window time2

checkpoint
ssc.checkpoint(hdfsPath), usually checkpoint time will be 5 - 10 times sliding

dstream.checkpoint(checkpointInterval)

receive the streaming in parallel,
val numstreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(…) }
val unifiedStream = streamingContext.union(kafkaStreams)

Recovery the Task from Checkpoint
def functionToCreateContext(): StreamingContext = {
     val ssc = new StreamingContext(...)
     val lines = sac.socketTextStream(...)
     ...
     ssc.checkpoint(checkpointDirectory)
     ssc
}

val context = StreamingContext.getOrCreate()checkpointDirectory, functionToCreateContext _)
context . ...
context. start()
context.awaitTermination()

2. Zookeeper
http://sillycat.iteye.com/blog/2015175

Install zookeeper
> wget http://apache.mesi.com.ar/zookeeper/stable/zookeeper-3.4.6.tar.gz
Unzip that, Place it in the working directory, add the bin to the path.

Set up the configuration
> cp conf/zoo_sample.cfg conf/zoo.cfg

Start the Server
>zkServer.sh start zoo.cfg

Check status
>zkServer.sh status

Or

>jps
2194
2294 QuorumPeerMain
2330 Jps

Connect from client
>zkCli.sh -server localhost:2181
zookeeper>help
zookeeper>quit

3. Kafka
Download the binary with version 8.2.1
> wget http://psg.mtu.edu/pub/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
Place that in the working directory and Add that to path

Command to start kafka
> kafka-server-start.sh config/server.properties

Create a topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

List the topic
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Producer sending some messages
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Start a Consumer
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning



References:
http://uohzoaix.github.io/studies/categories/#spark

spark streaming
http://dataunion.org/15193.html
http://dataunion.org/6308.html
http://colobu.com/2015/01/05/kafka-spark-streaming-integration-summary/
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics