* Apache Kafka supports a wide range of use cases as a general-purpose messaging system for scenarios where high throughput, reliable delivery, and horizontal scalability are important.
Use cases include:
* Stream Processing
* Website Activity Tracking
* Metrics Collection and Monitoring
* Log Aggs
* Apache Storm and Apache Spark both work very well in combination with Kafka.
Here's an example how Kafka can be used with apache spark for consuming events stream,
DEVELOPMENT
STEP 1 : ADD lib dependency to decoupled-invocation/build.sbt
STEP 2 : Create kafka stream in scala file decoupled-invocationsrc/main/scala/EventsConsumerApp.scala
STEP 3 : build application
DEPLOYMENT
STEP 4 : start kafka broker with default config
STEP 5 : Create kafka topic events-topic
verify topic is created
STEP 7 : submit spark job that will consume events produced by above kafka producer
As seen in console events produced using kafka-producer can be consumed in apache spark + kafka application, and can be persited to NoSQL databases or wherever thereafter.
Source code
scalability-patterns/decoupled-invocation
Resources
Spark Streaming + Kafka Integration Guide
Use cases include:
* Stream Processing
* Website Activity Tracking
* Metrics Collection and Monitoring
* Log Aggs
* Apache Storm and Apache Spark both work very well in combination with Kafka.
Here's an example how Kafka can be used with apache spark for consuming events stream,
DEVELOPMENT
STEP 1 : ADD lib dependency to decoupled-invocation/build.sbt
name := "decoupled-invocation" version := "1.0" scalaVersion := "2.10.4" val sparkVersion = "1.2.0" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion )
STEP 2 : Create kafka stream in scala file decoupled-invocationsrc/main/scala/EventsConsumerApp.scala
object EventsConsumerApp { def persist() = { } def main(args : Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("spark://prayagupd:7077").setAppName("EventsConsumerApp") val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(10)) val kafkaConf = Map("metadata.broker.list" -> "localhost:9092", "zookeeper.connect" -> "localhost:2181", "group.id" -> "events-topic-consumer-group", "zookeeper.connection.timeout.ms" -> "1000") //http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams val kafkaDiscretizedStream = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](sparkStreamingContext, kafkaConf, Map("events-topic" -> 1), StorageLevel.MEMORY_ONLY_SER) //persist() kafkaDiscretizedStream.print() sparkStreamingContext.start() sparkStreamingContext.awaitTermination() } }
STEP 3 : build application
sbt assembly ## doing sbt package wont find kafka jar while submitting job to spark
STEP 4 : start kafka broker with default config
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
#terminal 2
bin/kafka-server-start.sh config/server.properties
STEP 5 : Create kafka topic events-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic events-topic
Created topic "events-topic".
verify topic is created
bin/kafka-topics.sh --list --zookeeper localhost:2181
events-topic
STEP 6 : Produce stream using kafka to topic events-topicbin/kafka-console-producer.sh --broker-list localhost:9092 --topic events-topic {'message' : 'user is logging in'} {'message' : 'User logged in'}
STEP 7 : submit spark job that will consume events produced by above kafka producer
/usr/local/spark-1.2.0/bin/spark-submit --class EventsConsumerApp --master spark://prayagupd:7077 target/scala-2.10/decoupled-invocation-assembly-1.0.jar (null,{'message' : 'user is logging in'}) (null,{'message' : 'User logged in'}) 15/09/20 17:04:12 INFO scheduler.JobScheduler: Finished job streaming job 1442786650000 ms.0 from job set of time 1442786650000 ms 15/09/20 17:04:12 INFO scheduler.JobScheduler: Total delay: 2.294 s for time 1442786650000 ms (execution: 2.289 s) 15/09/20 17:04:12 INFO rdd.BlockRDD: Removing RDD 5 from persistence list 15/09/20 17:04:12 INFO storage.BlockManager: Removing RDD 5 15/09/20 17:04:12 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[5] at createStream at EventsConsumerApp.scala:39 of time 1442786650000 ms 15/09/20 17:04:12 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1442786630000 ms) 15/09/20 17:04:12 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/09/20 17:04:20 INFO scheduler.JobScheduler: Added jobs for time 1442786660000 ms
As seen in console events produced using kafka-producer can be consumed in apache spark + kafka application, and can be persited to NoSQL databases or wherever thereafter.
Source code
scalability-patterns/decoupled-invocation
Resources
Spark Streaming + Kafka Integration Guide
No comments:
Post a Comment