任务二代码
package com.shtd.flink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
object t1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val pro = new Properties()
pro.setProperty("bootstrap.servers", "huangshuang.xyz:9092")
val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), pro))
val outputTag = new OutputTag[String]("f")
val dataStream: DataStream[(String, String, Double)] = inputStream
.filter(data => {
if (data.matches("^O.*?"))
true
else
false
})
.process(new ProcessFunction[String, (String, String, Double)] {
override def processElement(value: String, ctx: ProcessFunction[String, (String, String, Double)]#Context, out: Collector[(String, String, Double)]): Unit = {
val str: Array[String] = value.split(",")
if (str(2).equals("F")) {
ctx.output(outputTag, value)
}
out.collect(str(1), str(2), str(3).toDouble)
}
})
val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("huangshuang.xyz").setPort(6379).build()
// 1、 使用Flink消费Kafka中的数据,统计个人实时订单总额
val outputStream1: DataStream[(String, String, Double)] = dataStream
.keyBy(_._1)
.sum(3)
outputStream1.addSink(new RedisSink[(String, String, Double)](config, new RedisMapper[(String, String, Double)] {
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "totalprice1")
override def getKeyFromData(t: (String, String, Double)): String = t._1
override def getValueFromData(t: (String, String, Double)): String = t._3.toString
}))
// 2、 在任务1进行的同时需监控若发现ORDERSTATUS字段为F
val outputStream2: DataStream[String] = dataStream
.getSideOutput(outputTag)
outputStream2.addSink(new RichSinkFunction[String] {
var connection: Connection = _
var preparedStatement: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
connection = DriverManager.getConnection("jdbc:mysql://huangshuang.xyz:3306/Data?characterEncoding=utf-8", "root", "Hs433125-")
preparedStatement = connection.prepareStatement("insert into alarmdata values (?)")
}
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
preparedStatement.setString(1, value)
preparedStatement.executeUpdate()
}
override def close(): Unit = {
connection.close()
preparedStatement.close()
}
})
// 3、 使用Flink消费kafka中的数据,统计每分钟下单的数量
val outputStream3: DataStream[(String, Int)] = dataStream
.map(data => ("totalorder", 1))
.keyBy(_._1)
.sum(1)
outputStream3.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)
override def getKeyFromData(t: (String, Int)): String = t._1
override def getValueFromData(t: (String, Int)): String = t._2.toString
}))
// 4、编写Scala工程代码,使用Flink消费kafka中的数据,统计所有用户每分钟订单总数与订单总额,然后计算出每分钟订单平均金额
val outputStream4: DataStream[(String, Double)] = dataStream
.timeWindowAll(Time.minutes(1))
.process(new ProcessAllWindowFunction[(String, String, Double), (String, Double), TimeWindow] {
override def process(context: Context, elements: Iterable[(String, String, Double)], out: Collector[(String, Double)]): Unit = {
val count: Int = elements.size
val sum: Double = elements.map(_._3).sum
out.collect(("avgprice", sum / count))
}
})
outputStream4.addSink(new RedisSink[(String, Double)](config, new RedisMapper[(String, Double)] {
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)
override def getKeyFromData(t: (String, Double)): String = t._1
override def getValueFromData(t: (String, Double)): String = t._2.toString
}))
// 5、使用Flink消费kafka中的数据,统计实时订单总额
val outputStream5: DataStream[(String, Double)] = dataStream
.map(data => ("totalprice2", data._3))
.keyBy(_._1)
.sum(1)
outputStream5.addSink(new RedisSink[(String, Double)](config, new RedisMapper[(String, Double)] {
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)
override def getKeyFromData(t: (String, Double)): String = t._1
override def getValueFromData(t: (String, Double)): String = t._2.toString
}))
env.execute()
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhefghbh
-
photoshop一直显示正在载入怎么办
PHP中文网 06-16 -
B站在海外打不开怎么办B站打不开解决办法
sixfast6 07-14 -
ipv4和ipv6显示未连接是什么意思原因
PHP中文网 06-22 -
excel表格日期变成井号了怎么办
PHP中文网 06-18 -
excel工具栏变成英文了怎么办
PHP中文网 06-20 -
photoshop怎么把印章抠出并放在另一张图上
PHP中文网 06-15 -
pr做好的序列不见了怎么办
PHP中文网 05-12 -
cad文件字体变成古怪繁体字怎么办
PHP中文网 07-07 -
电脑重启一直显示正在准备windows怎么办
PHP中文网 06-17 -
微信小程序自动跳出来怎么办
PHP中文网 06-11