• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

sparkstreaming 和 kafka重分区的场景应用

武飞扬头像
样柏
帮助1

sparkstreaming 与 kafka重分区的场景应用

昨天线上发现个bug,导致数据的重复,原因如下

线上场景是二个sparkstreaming程序。程序1主要是用来接收数据放入kafka集群,程序2读取数据进行处理,redis数据缓存。因为数据量很大,所以在程序1上先用reduceByKey去重。
学新通

程序1发送使用的是Avro序列化对象,要把固定条数一批数据都放在一个Avro对象然后传输到Kafka

对于kafka的分区器:
1 如果消息自带key则对key可以hash然后选择目标分区;
2 如果消息无key则采用RoundRobin轮询算法,这样可以最大限度确保消息在所有分区的均匀性;
3 特别的,生产者API赋予用户自行指定分区的权利,在发送消息时如果指定了分区则可以跳过以上分区法则。

程序一开始没有指定key进行发送的情况
(https://img-blog.csdnimg.cn/c8075a7606f949d5be566ba492038458.png学新通
这里的程序二的场景是通过redis存储用户的信息 更新信息,如果sparkStreaming同一批次 存在同一个用户的多条记录去get redis ,因为redis更新不在这一阶段,如果满足业务条件的记录有多条那就可能会出现业务数据的冗余。

bug的原因找到了,解决的方法非常直观 就是程序二的sparkStreaming一批次在这一个rdd中同一个用户的数据只能有一个。
这里满足条件可以分为二个方面

1.一个用户消息在只能存在同一个分区

先来看怎么满足第一个条件,程序一通过reduceByKey将一个用户消息放置在同一个分区,之前因为发送时Producer没有指定key所以导致发送时类轮询发送。
此时可能很容易想到指定用户唯一标识为key就行,但发送采用的是Avro对象,必定会存在一个对象存储多个用户信息,reduceByKey的分区策略和Kafka默认的分区策略肯定不相同。

思考片刻后发现如果要把一个对象里所有的用户数据都发到一个分区,也就是他们分区算法要相同 ,且分区也要相同
reduceByKey的可以指定分区策略(默认HashPartitioner),KafkaProducer也可以



这二个分区算法是我这边可控的,但还有一个因素就是分区数,程序一的分区要和kafka的分区保持一致,所以如果kafka增加了分区,那么我们这边程序应该实时的感知到,然后改变分区

程序一自定义reduceByKey和kafka分区器代码如下

这里的用socket流演示下
import BroadcastUtil.{parFunc, parFunc1}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}

import java.util
import java.util.Properties
import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit}
import scala.util.Random



object Hash_Partition {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[4]")
    val ssc = new StreamingContext(conf, Durations.seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    BroadcastUtil.creatInstance(ssc.sparkContext)
    val properties = new util.HashMap[java.lang.String,Object]()
    properties.put("bootstrap.servers", "localhost:9092")
    properties.put("key.serializer", classOf[StringSerializer])
    properties.put("value.serializer", classOf[StringSerializer])
    val kafkaProducer:Broadcast[KafkaSink[String,String]] = ssc.sparkContext.broadcast(KafkaSink(properties))

    val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.21.160", 6666)
    inputDS.foreachRDD(rdd => {


      ThreadPoolUtil.threadPoolExecutor.execute(new Runnable {
        override def run(): Unit = BroadcastUtil.update(rdd.sparkContext)
      })

      val rdd1 = rdd.map((_, Random.nextInt(10))).reduceByKey(new Partitioner {

        override def getPartition(key: Any): Int = {
          parFunc(key.toString, numPartitions)

        }

        override def numPartitions: Int = BroadcastUtil.getInstance().value
      }, (v1, v2) => {
        if (v1 > v2) v1 else v2
      })

      rdd1.mapPartitionsWithIndex((index, it) => {
        it.map(f => {
          println(s"key:${f._1} 当前分区: ${index} ")
          f
        })
      }).collect()

      rdd1.foreachPartition(partition => {
        //var Num = 0
        partition.foreach(f => {
          /**
           * //这里一般会将多条数据写到一个Avro对象里
           *
           * GenericRecord.read.(ByteArrayInputStream(f))
           * if(Num%一批数量){
           *  kafkaProducer.value.send(GenericRecord)
           * }
           * Num =1
           */
          val producer:KafkaSink[String,String] = kafkaProducer.value
          producer.send("topic_hash", parFunc1(f._1, BroadcastUtil.getInstance().value), null, f._1)
        })
      })
    })

    ssc.start()
    ssc.awaitTermination()

  }
}
学新通
线程池类
object ThreadPoolUtil{
   var threadPool:ThreadPoolExecutor = null
  // 创建单线程-线程池,任务依次执行
  def threadPoolExecutor:ThreadPoolExecutor = {
    if (threadPool==null) {
      threadPool = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](2), new ThreadPoolExecutor.CallerRunsPolicy)
    }
    threadPool
  }
}
广播变量单例类
object BroadcastUtil extends Serializable {
  @volatile private var instance: Broadcast[Int] = null

  def creatInstance(sc: SparkContext): Unit = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.broadcast(getKafkaPartitionNums("topic_hash"))
        }
      }
    }
  }

  def getInstance(): Broadcast[Int] = {
    instance
  }

  def update(sc: SparkContext): Unit = {
    val newPartitionNum = getKafkaPartitionNums("topic_hash")
    if (instance != null && newPartitionNum != instance) {
      instance.unpersist()
      instance = sc.broadcast(newPartitionNum)
    }

  }

  private[this] def getKafkaPartitionNums(topic: String): Int = {
    val partitionNums = getProducer().partitionsFor(topic).size()
   // println(s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis())} upadte:newPartitionNum  ${partitionNums}")
    getProducer().close()
    producer = null
    partitionNums
  }

  private var producer: KafkaProducer[String, String] = null

  private def getProducer(): KafkaProducer[String, String] = {
    if (producer == null) {
      val properties = new Properties()
      properties.put("bootstrap.servers", "localhost:9092")
      properties.put("key.serializer", classOf[StringSerializer])
      properties.put("value.serializer", classOf[StringSerializer])
      producer = new KafkaProducer[String, String](properties)
    }
    producer
  }

  def parFunc(key: String, numPartitions: Int): Int = {
    val partition = key.hashCode().abs % numPartitions
    partition
  }
  def parFunc1(key: String, numPartitions: Int): Int = {
    val partition = parFunc(key, numPartitions)
    println(s"key:${key} 指定分区:${partition} 总分区:${numPartitions}")
    partition
  }
}



学新通
封装的kafkaProducer类
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}

import java.util.concurrent.Future

class KafkaSink[K,V](createProducer: () => KafkaProducer[K,V]) extends Serializable {

  lazy val producer: KafkaProducer[K,V] = createProducer()

  def send(topic: String, partition:Int,key:K, value: V): Future[RecordMetadata] =
    {
      producer.send(new ProducerRecord[K,V](topic,partition,key,value),new Callback {
        override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
          println("发送成功, 累加器 1 & 入库 1")
        }
      })
    }

  def flush() = producer.flush()
}

object KafkaSink {
  def apply[K, V](config:java.util.Map[java.lang.String, Object]): KafkaSink[K, V] = {
    val createProducerFunc: () => KafkaProducer[K, V] = () => {
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
        producer.close()
      }
      producer
    }
    new KafkaSink[K, V](createProducerFunc)
  }

}
学新通

创建topic 5个分区
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 5 --topic topic_hash

学新通
启动程序
依次输入 a b c d e f g h i j k l

学新通


学新通
订阅0分区,j和k是0分区
![在这里插入图片描述](https://img-blog.csdnimg.cn/f582ca3cf690464b98c98770504e50f8.png

当前分区代表spark分区,指定分区代表Kafka的分区

consumer代码

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.{ TopicPartition}
import org.apache.kafka.common.serialization.{StringDeserializer}

import java.util.{Properties}

object KafkaConsumer {

  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "localhost:9092")
    properties.put("key.deserializer", classOf[StringDeserializer])
    properties.put("value.deserializer", classOf[StringDeserializer])
    properties.put("auto.offset.reset", "earliest")
    properties.put("enable.auto.commit", "true")
    properties.put("group.id", "group0231")

    val consumer = new KafkaConsumer[String, String](properties)
    consumer.assign(java.util.Arrays.asList(new TopicPartition("topic_hash",0)))

    while (true){
      val records: ConsumerRecords[String, String] = consumer.poll(100)
      import scala.collection.convert.wrapAll._
      for (record <- records){
        val topicName: String = record.topic()
        val partition: Int = record.partition()
        val offset: Long = record.offset()
        val msg: String = record.value()
        println(s"receive: ${msg}\t${topicName}\t${partition}\t${offset}")
      }
    }
  }
}
学新通
通过改变kafka集群分区,sparkStreaming分区会感知到(使用通过更新广播变量)

学新通

2.同一个分区内用户消息只能存在一个

现在从程序一上看似乎是每一批次同一个用户只存在一个
但从程序二上看,因为接的批次不同,尽管同一用户不再分布在各个分区,但是同一分区会存在同一用户多条记录

学新通

这个看似可以在程序二上通过去重解决,但是忽略了业务场景,以下简略列出使用 简单的去重/不使用去重 出现的一些问题

学新通
这里假设在极端情况下 取一个用户在某个分区的情况
结果一 :代表使用使用如reduceByKey取一批时间戳最大的去重,线上的场景是根据用户自身的时间戳取数据,隔一段固定时间取粉色那条数据, 存在的问题是数据取少了
还存在一个问题是分组产生shuffle,落地大量文件,影响程序性能
(程序一的reduceByKey数据的间隔很小 ,不会出现过滤跨度超过业务需求)

结果二:代表不去重,很明显有大量数据冗余

这个可以通过用scala的集合类划分数据解决

Dstream_w.foreachRDD(rdd=>{
      rdd.foreachPartition(partition=>{
        val list:List[ReceiveDataObject] = partition.toList
        //person_code List[ReceiveDataObject]
        val person_code:Map[String,List[ReceiveDataObject]] = list.groupBy(_.person_code)

        var timestamp= 0L
        //这里基于拿到需要的数据
        val iterable:Iterable[List[ReceiveDataObject]] = person_code.map(entry => {
          entry._2.reduce((op1: ReceiveDataObject, op2: ReceiveDataObject) => {
            if (timestamp == 0L) timestamp = op1.timestamp

            if (op1.timestamp - op2.timestamp > new Properties().get("").toString.toLong) {
              timestamp = op1.timestamp
              op1.load = true
            } else {
              if (op2.timestamp - timestamp > new Properties().get("").toString.toLong) {
                op2.load = true
              }
            }
          })
          entry._2.filter(_.load)
        })

        val iterable1:Iterable[List[ReceiveDataObject]] = person_code.map(entry => {
          entry._2.filter(f=>{
            if (timestamp == 0L) timestamp = {
              f.timestamp
            } else {
              if (f.timestamp - timestamp > new Properties().get("").toString.toLong) {
                return true
              }
            }
            return false
          })
        })
      })
    })
学新通

这个根据业务逻辑可以简单划分后就没有问题了

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgchbbe
系列文章
更多 icon
同类精品
更多 icon
继续加载