• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

任务二代码

武飞扬头像
DH、85
帮助7

package com.shtd.flink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties

object t1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val pro = new Properties()
    pro.setProperty("bootstrap.servers", "huangshuang.xyz:9092")
    val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), pro))
    val outputTag = new OutputTag[String]("f")
    val dataStream: DataStream[(String, String, Double)] = inputStream
      .filter(data => {
        if (data.matches("^O.*?"))
          true
        else
          false
      })
      .process(new ProcessFunction[String, (String, String, Double)] {
        override def processElement(value: String, ctx: ProcessFunction[String, (String, String, Double)]#Context, out: Collector[(String, String, Double)]): Unit = {
          val str: Array[String] = value.split(",")
          if (str(2).equals("F")) {
            ctx.output(outputTag, value)
          }
          out.collect(str(1), str(2), str(3).toDouble)
        }
      })

    val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("huangshuang.xyz").setPort(6379).build()

    //    1、 使用Flink消费Kafka中的数据,统计个人实时订单总额
    val outputStream1: DataStream[(String, String, Double)] = dataStream
      .keyBy(_._1)
      .sum(3)
    outputStream1.addSink(new RedisSink[(String, String, Double)](config, new RedisMapper[(String, String, Double)] {
      override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "totalprice1")

      override def getKeyFromData(t: (String, String, Double)): String = t._1

      override def getValueFromData(t: (String, String, Double)): String = t._3.toString
    }))

    //    2、 在任务1进行的同时需监控若发现ORDERSTATUS字段为F
    val outputStream2: DataStream[String] = dataStream
      .getSideOutput(outputTag)
    outputStream2.addSink(new RichSinkFunction[String] {
      var connection: Connection = _
      var preparedStatement: PreparedStatement = _

      override def open(parameters: Configuration): Unit = {
        connection = DriverManager.getConnection("jdbc:mysql://huangshuang.xyz:3306/Data?characterEncoding=utf-8", "root", "Hs433125-")
        preparedStatement = connection.prepareStatement("insert into alarmdata values (?)")
      }

      override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
        preparedStatement.setString(1, value)
        preparedStatement.executeUpdate()
      }

      override def close(): Unit = {
        connection.close()
        preparedStatement.close()
      }
    })

    //    3、 使用Flink消费kafka中的数据,统计每分钟下单的数量
    val outputStream3: DataStream[(String, Int)] = dataStream
      .map(data => ("totalorder", 1))
      .keyBy(_._1)
      .sum(1)
    outputStream3.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
      override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)

      override def getKeyFromData(t: (String, Int)): String = t._1

      override def getValueFromData(t: (String, Int)): String = t._2.toString
    }))

    //    4、编写Scala工程代码,使用Flink消费kafka中的数据,统计所有用户每分钟订单总数与订单总额,然后计算出每分钟订单平均金额
    val outputStream4: DataStream[(String, Double)] = dataStream
      .timeWindowAll(Time.minutes(1))
      .process(new ProcessAllWindowFunction[(String, String, Double), (String, Double), TimeWindow] {
        override def process(context: Context, elements: Iterable[(String, String, Double)], out: Collector[(String, Double)]): Unit = {
          val count: Int = elements.size
          val sum: Double = elements.map(_._3).sum
          out.collect(("avgprice", sum / count))
        }
      })
    outputStream4.addSink(new RedisSink[(String, Double)](config, new RedisMapper[(String, Double)] {
      override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)

      override def getKeyFromData(t: (String, Double)): String = t._1

      override def getValueFromData(t: (String, Double)): String = t._2.toString
    }))

    //    5、使用Flink消费kafka中的数据,统计实时订单总额
    val outputStream5: DataStream[(String, Double)] = dataStream
      .map(data => ("totalprice2", data._3))
      .keyBy(_._1)
      .sum(1)
    outputStream5.addSink(new RedisSink[(String, Double)](config, new RedisMapper[(String, Double)] {
      override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)

      override def getKeyFromData(t: (String, Double)): String = t._1

      override def getValueFromData(t: (String, Double)): String = t._2.toString
    }))

    env.execute()
  }
}

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhefghbh
系列文章
更多 icon
同类精品
更多 icon
继续加载