博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
数据零丢失kafka + zookeeper
阅读量:5033 次
发布时间:2019-06-12

本文共 3795 字,大约阅读时间需要 12 分钟。

package kafkautils

import kafka.ZookeeperHelper.client

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import scala.collection.JavaConversions._

/**

* Created on 下午9:37.
*/
object KafkaZKManager extends Serializable{

val client = {

val client = CuratorFrameworkFactory
.builder
.connectString("spark123:12181/kafka0.9")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("mykafka")
.build()
client.start()
client
}

val kakfaOffsetRootPath = "/consumers/offsets"

// 确保zookeeper中的路径是存在的

def ensureZKPathExists(path: String): Unit = {
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().forPath(path)
}
}

def storeOffsets(offsetsRanges:Array[OffsetRange], groupName:String) = {

for (o <- offsetsRanges) {
// 保存offset到zk

}

}

 

def getFromOffsets(topic : String,groupName : String): (Map[TopicAndPartition, Long], Int) = {

// 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
var fromOffsets: Map[TopicAndPartition, Long] = Map()
val zkTopicPath = s"${kakfaOffsetRootPath}/${groupName}/${topic}"
ensureZKPathExists(zkTopicPath)
val t = client.getChildren.forPath(zkTopicPath)
val offsets = for {
p <- client.getChildren.forPath(zkTopicPath)
} yield {
//遍历路径下面的partition中的offset
val data = client.getData.forPath(s"$zkTopicPath/$p")
//将data变成Long类型
val offset = java.lang.Long.valueOf(new String(data)).toLong
println("offset:" + offset)
(TopicAndPartition(topic, Integer.parseInt(p)), offset)
}

if(offsets.isEmpty) {

(offsets.toMap,0)
}else{
(offsets.toMap,1)
}

}

def createMyDirectKafkaStream (ssc: StreamingContext, kafkaParams: Map[String, String], topic: String, groupName: String
): InputDStream[(String, String)] = {
val (fromOffsets, flag) = getFromOffsets( topic, groupName)

var kafkaStream : InputDStream[(String, String)] = null

if (flag == 1) {
// 这个会将kafka的消息进行transform,最终kafak的数据都会变成(topic_name, message)这样的tuple
val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
println("fromOffsets:" + fromOffsets)
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
// 如果未保存,根据kafkaParam的配置使用最新或者最旧的offset
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
}
kafkaStream
}

def main(args: Array[String]): Unit = {

val processingInterval = 2

val brokers = "spark123:9092"
val topic = "mytest1"
val sparkConf = new SparkConf().setAppName("test").setMaster("local[2]")
// Create direct kafka stream with brokers and topics
val topicsSet = topic.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"auto.offset.reset" -> "smallest")

val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))

val messages = createMyDirectKafkaStream(ssc, kafkaParams, topic, "testp")

 

messages.foreachRDD((rdd,btime) => {

if(!rdd.isEmpty()){
println("==========================:" + rdd.count() )
println("==========================btime:" + btime )
}
storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "testp")
})

ssc.start()

ssc.awaitTermination()

}

}

转载于:https://www.cnblogs.com/heguoxiu/p/10149629.html

你可能感兴趣的文章
华为手机权限开启方法4
查看>>
(hzau)华中农业大学第四届程序设计大赛网络同步赛 G: Array C
查看>>
新概念 Lesson 3 Nice to meet you
查看>>
第一次JAVA作业
查看>>
POJ-3041 Asteroids 二分图匹配
查看>>
HDU-1280 前m大的数
查看>>
redis基本指令
查看>>
制作动态链接库
查看>>
Frame 处理
查看>>
读代码
查看>>
pythonweb框架Flask学习笔记02-一个简单的小程序
查看>>
火星坐标系 (GCJ-02) 与百度坐标系 (BD-09) 的转换算法
查看>>
NSThread创建方式
查看>>
Hadoop+Spark+Hbase部署整合篇
查看>>
Android基础类之BaseAdapter
查看>>
Pagerank
查看>>
电脑重装系统按什么键进U盘PE
查看>>
MyEclipse2014安装图解
查看>>
Gym 100733C
查看>>
如果你觉得我的博客对你有帮助,请帮忙加点我所在团队博客访问量http://home.cnblogs.com/u/newbe/...
查看>>