• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

知识点记录-使用kafka

武飞扬头像
dong-123456
帮助1

知识点记录-使用kafka

package com.test

import org.apache.kafka.clients.producer._
import org.apache.kafka.common.Cluster

import java.util
import java.util.Properties

//测试KAFKA
object SparkKafka {

  def main(args: Array[String]): Unit = {

    //设置基本参数
    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") //多个用,号
    //设置KEY序列化
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    //    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    //设置VALUE序列化
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    //设置分区器
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, classOf[CustomPartitioner])

    //优化PRODUCER端参数
    //增加缓冲区recordaccumulator  64M
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024)
    //修改bach.size 默认16K 增加64K
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4) //批次64K
    //linger.ms 默认0ms  -> 5-100ms
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 50) //50ms
    //compression.type 打开压缩snappy
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")

    //设置ACK
    properties.put(ProducerConfig.ACKS_CONFIG, "-1")
    //设置RETRY
    properties.put(ProducerConfig.RETRIES_CONFIG, 3)

    val kafkaProducer = new KafkaProducer[String, String](properties)

    for (k <- 1 to 5) {
      System.out.println("kafkaProducer="   kafkaProducer   "  k="   k)

      // 异步 PRODUCER->缓冲区
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:"   k))
      // 异步 回调
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:"   k), new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          e.printStackTrace()
          if (e == null) {
            System.out.println("主题="   recordMetadata.topic()   " 分区="   recordMetadata.partition())
          }
        }
      })

      //同步PRODUCER->缓冲区->BROKER
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:"   k)).get()

    }

    // 分区 默认 org.apache.kafka.clients.producer.internals.DefaultPartitioner
    // 可切分存多个BROKER
    // 负载均衡
    // 并行度
    //指定分区0
    kafkaProducer.send(new ProducerRecord[String, String]("topic1", 0, "", "topic1:"), new Callback {
      override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
        e.printStackTrace()
        if (e == null) {
          System.out.println("主题="   recordMetadata.topic()   " 分区="   recordMetadata.partition())
        }
      }
    })

    //指定key分区策略   HASH(KEY)%分区
    kafkaProducer.send(new ProducerRecord[String, String]("topic1", "key1", "topic1:"), new Callback {
      override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
        e.printStackTrace()
        if (e == null) {
          System.out.println("主题="   recordMetadata.topic()   " 分区="   recordMetadata.partition())
        }
      }
    })

    //默认分区策略 STICKY
    for (k <- 1 to 500) {
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:"   k), new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          e.printStackTrace()
          if (e == null) {
            System.out.println("主题="   recordMetadata.topic()   " 分区="   recordMetadata.partition())
          }
        }
      })
      Thread.sleep(20)
    }

    //测试分区器
    for (k <- 1 to 50) {
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "分区器:"   k))
    }

    //ACK ISR队列节点:存储运行的LEAD FOLLER节点编号    LEAD与FOLLER同步参数replica.lag.time.max.ms
    //0 -> 内存中 丢失  1 -> LEAD存储 丢失  -1 -> LEAD ISR队列节点存储
    //AT LEAST ONCE (不丢有重): ACK=-1   REPLICA>=2   ISR队列参数(min.insync.replicas)>=2
    //AT MOST ONCE (有丢不重): ACK=0
    //不丢不重: 幂等   事务
    //消息判重:<PID/*重启更新*/,分区编号, 序列号/*递增*/>
    //事务
    //properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionid1")
    kafkaProducer.initTransactions()
    kafkaProducer.beginTransaction()
    try {
      for (k <- 1 to 5) {
        kafkaProducer.send(new ProducerRecord[String, String]("topic1", "事务:"   k))
      }
      kafkaProducer.commitTransaction()
    } catch (Exception e) {
      kafkaProducer.abortTransaction()
    }

    //分区内有序
    //版本<1.X: 设置max.in.flight.requests.per.connection = 1
    //版本>=1.X: 打开幂等 设置max.in.flight.requests.per.connection <= 5

    kafkaProducer.close()
  }

  //定义分区器
  class CustomPartitioner extends Partitioner {
    override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
      if (value.toString.contains("分区器")) {
        0
      }
      else {
        1
      }
    }

    override def close(): Unit = {

    }

    override def configure(map: util.Map[String, _]): Unit = {

    }
  }


}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgchfac
系列文章
更多 icon
同类精品
更多 icon
继续加载