知识点记录-使用kafka
知识点记录-使用kafka
package com.test
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.Cluster
import java.util
import java.util.Properties
//测试KAFKA
object SparkKafka {
def main(args: Array[String]): Unit = {
//设置基本参数
val properties = new Properties()
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") //多个用,号
//设置KEY序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
//设置VALUE序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//设置分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, classOf[CustomPartitioner])
//优化PRODUCER端参数
//增加缓冲区recordaccumulator 64M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024)
//修改bach.size 默认16K 增加64K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4) //批次64K
//linger.ms 默认0ms -> 5-100ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 50) //50ms
//compression.type 打开压缩snappy
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
//设置ACK
properties.put(ProducerConfig.ACKS_CONFIG, "-1")
//设置RETRY
properties.put(ProducerConfig.RETRIES_CONFIG, 3)
val kafkaProducer = new KafkaProducer[String, String](properties)
for (k <- 1 to 5) {
System.out.println("kafkaProducer=" kafkaProducer " k=" k)
// 异步 PRODUCER->缓冲区
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" k))
// 异步 回调
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" k), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" recordMetadata.topic() " 分区=" recordMetadata.partition())
}
}
})
//同步PRODUCER->缓冲区->BROKER
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" k)).get()
}
// 分区 默认 org.apache.kafka.clients.producer.internals.DefaultPartitioner
// 可切分存多个BROKER
// 负载均衡
// 并行度
//指定分区0
kafkaProducer.send(new ProducerRecord[String, String]("topic1", 0, "", "topic1:"), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" recordMetadata.topic() " 分区=" recordMetadata.partition())
}
}
})
//指定key分区策略 HASH(KEY)%分区
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "key1", "topic1:"), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" recordMetadata.topic() " 分区=" recordMetadata.partition())
}
}
})
//默认分区策略 STICKY
for (k <- 1 to 500) {
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" k), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" recordMetadata.topic() " 分区=" recordMetadata.partition())
}
}
})
Thread.sleep(20)
}
//测试分区器
for (k <- 1 to 50) {
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "分区器:" k))
}
//ACK ISR队列节点:存储运行的LEAD FOLLER节点编号 LEAD与FOLLER同步参数replica.lag.time.max.ms
//0 -> 内存中 丢失 1 -> LEAD存储 丢失 -1 -> LEAD ISR队列节点存储
//AT LEAST ONCE (不丢有重): ACK=-1 REPLICA>=2 ISR队列参数(min.insync.replicas)>=2
//AT MOST ONCE (有丢不重): ACK=0
//不丢不重: 幂等 事务
//消息判重:<PID/*重启更新*/,分区编号, 序列号/*递增*/>
//事务
//properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionid1")
kafkaProducer.initTransactions()
kafkaProducer.beginTransaction()
try {
for (k <- 1 to 5) {
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "事务:" k))
}
kafkaProducer.commitTransaction()
} catch (Exception e) {
kafkaProducer.abortTransaction()
}
//分区内有序
//版本<1.X: 设置max.in.flight.requests.per.connection = 1
//版本>=1.X: 打开幂等 设置max.in.flight.requests.per.connection <= 5
kafkaProducer.close()
}
//定义分区器
class CustomPartitioner extends Partitioner {
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
if (value.toString.contains("分区器")) {
0
}
else {
1
}
}
override def close(): Unit = {
}
override def configure(map: util.Map[String, _]): Unit = {
}
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgchfac
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13