首页 > 其他 > 详细

SparkStreaming/Flink读取Kafka的数据

时间:2019-06-12 11:06:34      阅读:14      评论:0      收藏:0      [点我收藏+]

标签:con   没有初始化   ast   star   工具类   准备   offset   test   bootstra   

spark读取kafka的数据
 
前期准备:创建一个config.properties     
插入:
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
 
//新建一个kafka的工具类
object MyKafkaUtil {
  private val properties: Properties = PropertiesUtil.load("config.properties")
  val broker_list = properties.getProperty("kafka.broker.list")
 
  // kafka消费者配置
  val kafkaParam = Map(
    "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    //用于标识这个消费者属于哪个消费团体
    "group.id" -> "gmall_consumer_group",
    //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
    //可以使用这个配置,latest自动重置偏移量为最新的偏移量
    "auto.offset.reset" -> "latest",
    //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
    //如果是false,会需要手动维护kafka偏移量
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )
 
  // 创建DStream,返回接收到的输入数据
  // LocationStrategies:根据给定的主题和集群地址创建consumer
  // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
  // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
  // ConsumerStrategies.Subscribe:订阅一系列主题
 
  def getKafkaStream(topic: String,ssc:StreamingContext):  InputDStream[ConsumerRecord[String,String]]={
    val dStream = KafkaUtils.createDirectStream[String,String](ssc,  LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam))
    dStream
  }
}
 
//消费kafka的数据
object RealtimeStartupApp {
  def main(args: Array[String]): Unit = {
       val sparkConf: SparkConf = new  SparkConf().setMaster("local[*]").setAppName("name")
       val sc = new SparkContext(sparkConf)
       val ssc = new StreamingContext(sc,Seconds(10))
       val startupStream: InputDStream[ConsumerRecord[String, String]] =  MyKafkaUtil.getKafkaStream(kafka_topic,ssc)
}
 
 
Flink消费Kafka的数据
val properties = new Properties()
  // kafka消费者配置
properties.setProperty("bootstrap.servers", "localhost:9092")
 
properties.setProperty("group.id", "consumer-group")
 
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
 
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
 
properties.setProperty("auto.offset.reset", "latest")
//简历Flink环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置时间类型为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置并行度为1 
env.setParallelism(1)
 
val stream = env
  .addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))

 

 
 

SparkStreaming/Flink读取Kafka的数据

标签:con   没有初始化   ast   star   工具类   准备   offset   test   bootstra   

原文:https://www.cnblogs.com/datacan/p/11008224.html

(0)
(0)
   
举报
评论 一句话评论(0
0条  
登录后才能评论!
© 2014 designnerd.net 版权所有 鲁ICP备09046678号-4
打开技术之扣,分享程序人生!
             

鲁公网安备 37021202000002号