• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

akka模拟Spark的Master和Worker通信

武飞扬头像
Maverick_曲流觞
帮助4

利用akka模拟Spark的Master与Worker通信


Spark是一个基于内存计算的大数据处理框架,它提供了一个独立部署模式(Standalone),可以在自己的集群中运行Spark应用程序。在这种模式下,Spark有两种角色:Master和Worker。Master是集群的控制者,负责管理Worker的注册、注销、状态变更等,以及调度Driver和Executor的运行。Worker是集群的工作者,负责运行Driver和Executor,并向Master报告自己的状态和资源信息。

在本文中,我们将使用Akka框架来模拟Spark的Master与Worker通信过程。Akka是一个基于Actor模型的并发和分布式编程框架,它提供了一种简单而强大的抽象,让我们可以用异步消息来构建高性能、可扩展、容错的系统。

Master与Worker通信过程

我们将使用Akka来模拟以下几个步骤:

  1. 启动Master和Worker,并建立连接。
  2. Worker向Master注册自己的信息(内存、核数等)。
  3. Master收到Worker的注册信息后,回复注册成功的消息。
  4. Worker收到注册成功的消息后,启动一个定时任务,定期向Master发送心跳包(3秒)。
  5. Master收到心跳包后,更新Worker的状态信息。
  6. Master启动一个定时任务,检查Worker是否超时(30秒),如果超时,则删除Worker的信息。

学新通

消息类

首先,我们需要定义一些消息类型,用于在Master和Worker之间传递数据:

//使用样例类是因为方便序列化,模拟是在一台机器上,但多台机器时不行,所以要序列化
/**
 *样例类,表示从节点的注册信息
 * @param slave_id 从节点的Id
 * @param lastUpdateTime 上次更新心跳的时间
 * @param cores 从节点的核数
 * @param memory 从节点的内存大小
 */
case class RegisterClass(val slave_id:String,var lastUpdateTime:Long,val cores:Int,val memory:String)

/**
 * 样例对象,表示注册成功
 */
case object RegisterSuccess

/**
 * 样例类,表示心跳
 * @param slave_id 发送心跳的从节点ID
 */
case class HeartBeat(val slave_id:String)

/**
 * 样例对象,表示主节点检测从节点是否超时发送心跳
 */
case object CheckTimeOut
学新通

Master实现

然后,我们需要定义一个Master类,继承自Actor特质,并实现receive方法,用于处理收到的消息。Master类还需要一个preStart方法,在启动前执行一些初始化操作,例如启动定时任务检查Worker是否超时。
需要导入的包

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration.Duration
//import scala.collection.mutable.ListBuffer

class Master

class Master extends Actor {
  //val buffer: ListBuffer[RegisterClass] = ListBuffer[RegisterClass]()
  // 定义一个可变的映射变量,名为slaves ,初始化这个映射为空
  var slaves: mutable.Map[String, RegisterClass] = mutable.Map[String, RegisterClass]()

  // 重写receive方法,定义Actor可以处理的消息类型
  override def receive: Receive = {
    // 如果收到一个RegisterClass类型的消息,将其赋值给x
    case x: RegisterClass => {
      // 将x中的slave_id和RegisterClass对象放入slaves映射中
      slaves.put(x.slave_id, x)
      // 打印出当前注册的worker的数量
      println(s"worker:${x.slave_id} is registering................................................ current workers ${slaves.size}")
      // 向发送者回复一个RegisterSuccess消息
      sender() ! RegisterSuccess
    }
    // 如果收到HeartBeat类型的消息
    case x: HeartBeat => {
      // 从Map中获取slave actor的信息
      slaves.get(x.slave_id) match {
        // 如果存在该slave actor的信息
        case Some(value) => {
          // 更新该slave actor的最后更新时间
          value.lastUpdateTime = System.currentTimeMillis()
          // 将更新后的信息存储到Map中
          slaves.put(x.slave_id, value)
          // 打印一句话,表示该slave actor正常运行
          println(s"${x.slave_id} is responsive and functional. ")
          // 打印一句话,表示当前在线的slave actor数量
          println(s"Current online workers is ${slaves.size}")
        }
        // 如果不存在该slave actor的信息
        case None => {
          // 打印一句话,表示该slave actor不存在
          println(s"${x.slave_id} does not exist !!!")
        }
      }
    }
    // 如果收到CheckTimeOut消息
    case CheckTimeOut => {
      // 如果Map不为空
      if (slaves.nonEmpty) {
        // 过滤Map中的信息
        slaves = slaves.filter(tuple => {
          // 如果该slave actor的最后更新时间距离当前时间超过30秒
          if (System.currentTimeMillis() - tuple._2.lastUpdateTime > 30000) {
            // 打印一句话,表示该slave actor已经超时,并从Map中删除该信息
            println(s"${tuple._1} is timeout , removed from mater !!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            false
          } else true
        })
      }
    }
  }


  // 重写preStart方法,该方法在Actor启动时被调用
  override def preStart(): Unit = {
    // 导入context中的dispatcher,用于执行定时任务
    import context.dispatcher
    // 使用context中的system调用scheduler方法,创建一个定时任务
    context.system.scheduler.schedule(
      // 定时任务的初始延迟时间为10秒
      Duration(10, TimeUnit.SECONDS),
      // 定时任务的执行间隔为10秒
      Duration(10, TimeUnit.SECONDS),
      // 定时任务的接收者为self,即当前Actor
      self,
      // 定时任务发送的消息为CheckTimeOut,用于检查超时
      CheckTimeOut )
  }
}

学新通

最后,我们需要定义一个Master对象,用于启动Master并创建Master实例。我们需要配置Master的地址和端口,并根据这些参数创建一个配置对象。然后,我们需要使用这个配置对象创建一个ActorSystem,并使用它创建一个Master实例。
object Master

object Master {
  def main(args: Array[String]): Unit = {
    // 定义一个配置字符串,包含actor的提供者,主机名和端口号
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 8888
        |""".stripMargin
    // 解析配置字符串,得到一个配置对象
    val config = ConfigFactory.parseString(conf)
    // 根据配置对象创建一个actor系统,命名为Hadoop
    val actorSystem = ActorSystem("Hadoop",config)
    // 在actor系统中创建一个slave actor,命名为master
    actorSystem.actorOf(Props(new Master),"master")
  }
}
学新通

Worker实现

同样地,我们也需要定义一个Worker类,继承自Actor特质,并实现receive方法,用于处理收到的消息,并启动定时任务发送心跳的方法。所以还有写一个启动定时任务发送心跳的方法。Worker类也需要一个preStart方法,在启动前执行一些初始化操作,例如向Master注册自己的信息,并启动定时任务发送心跳包。
需要导入的包

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

class Worker

class Slave extends Actor {
  // 定义一个slave_id变量,赋值为"slave1",表示从节点的id
  val slave_id = "slave1"
  // 定义一个cores变量,赋值为30,表示从节点的核心数
  val cores = 30
  // 定义一个memory变量,赋值为"64G",表示从节点的内存大小
  val memory = "64G"
  // 重写receive方法,它返回一个Receive类型的偏函数
  override def receive: Receive = {
    // 匹配RegisterSuccess消息,表示注册成功
    case RegisterSuccess =>
      // 打印一条信息,表示从节点注册成功
      println(s"${slave_id} register successfully!!!!!!!!!!")
      // 调用sendHeartBeat方法,向sender发送心跳消息
      sendHeartBeat()
  }
  // 这是一个sendHeartBeat方法,它没有参数和返回值
  def sendHeartBeat():Unit={
    /** initialDelay: FiniteDuration, // 这是一个有限的持续时间,表示第一次发送心跳的延迟时间
    interval: FiniteDuration, // 这是一个有限的持续时间,表示发送心跳的间隔时间
    receiver: ActorRef, // 这是一个actor引用,表示接收心跳的actor
    message: Any) // 这是一个任意类型的值,表示心跳的内容
     */
    // 导入context.dispatcher,它是一个执行器,用于执行定时任务
    import context.dispatcher
    // 调用context.system.scheduler.schedule方法,创建一个定时任务
    context.system.scheduler.schedule(
      // 设置第一次发送心跳的延迟时间为3秒
      Duration(3,TimeUnit.SECONDS),
      // 设置发送心跳的间隔时间为3秒
      Duration(3,TimeUnit.SECONDS),
      // 设置接收心跳的actor为sender,即调用该方法的actor
      sender(),
      // 设置心跳的内容为HeartBeat(slave_id),其中slave_id是一个变量,表示从节点的id
      HeartBeat(slave_id))
  }
  // 重写preStart方法,它没有参数和返回值
  override def preStart(): Unit = {
    // 创建一个RegisterClass对象,它包含从节点的id,注册时间,核心数和内存大小
    val register = RegisterClass(slave_id, System.currentTimeMillis(), cores, memory)
    // 创建一个proxy对象,它是一个actor选择器,用于选择远程的master actor
    val proxy = context.actorSelection("akka.tcp://Hadoop@localhost:8888/user/master")
    // 通过proxy向master actor发送register消息,表示请求注册
    proxy ! register
  }
}

学新通

最后,我们需要定义一个Worker对象,用于启动Worker并创建Worker实例。我们配置Worker和Master的地址和端口,以及Worker的核数和内存等参数,并根据这些参数创建一个配置对象。然后,我们需要使用这个配置对象创建一个ActorSystem,并使用它创建一个Worker实例。
object Worker

object Slave {
  def main(args: Array[String]): Unit = {
    // 定义一个配置字符串,包含actor的提供者,主机名和端口号
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 6666
        |""".stripMargin
    // 解析配置字符串,得到一个配置对象
    val config = ConfigFactory.parseString(conf)
    // 根据配置对象创建一个actor系统,命名为Hadoop2
    val actorSystem = ActorSystem("Hadoop2",config)
    // 在actor系统中创建一个slave actor,命名为slave1
    actorSystem.actorOf(Props(new Slave),"slave1")
  }
}

学新通

运行master

学新通

运行第一个slave

学新通

查看master

学新通

运行第二个slave

学新通

点击Edit Configurations

学新通

点击Modify options

学新通

选择Allow

然后点击apply

修改为slave2,端口改为6662

学新通

学新通

学新通

查看master

学新通

运行三个slave

修改同slave2

学新通

查看master

学新通

关闭slave3

学新通

等待30秒查看结果

学新通

运行结果
我们可以分别运行Master和Slave对象,观察控制台输出。我们可以看到以下结果:

Master启动,Slave1启动后

Master的控制台打印出"worker:slave1 is registering… current workers 1"

Slave1的控制台打印出"slave1 register successfully!!!"

表明slave1注册成功,随后master控制台打印出

"slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1

…"

表明slave1按时发送心跳检测



slave2启动

Master的控制台打印出"worker:slave2 is registering… current workers 2"

Slave2的控制台打印出"slave2 register successfully!!!"

表明slave2注册成功,随后master控制台打印出

"slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2

…"

表明slave1和slave2按时发送心跳检测



slave3启动

Master的控制台打印出"worker:slave3 is registering… current workers 3"

Slave3的控制台打印出"slave3 register successfully!!!"

表明slave3注册成功,随后master控制台打印出

“slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
…”

表明slave1,slave2和slave3按时发送心跳检测



slave3关闭

Master的控制台打印出"slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
…"
可以看出此时只有slave1和slave2在发送心跳,但Current online workers is 3,说明master还没有检测出slave3已经关闭.
master心跳机制检测出来后,master控制台打印出
“slave3 is timeout , removed from mater !!!
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
…”

表明slave1,slave2按时发送心跳检测,slave3已经超时,删除

总结

本文介绍了如何使用Akka模拟Spark的Master与Worker通信过程。首先定义了一些消息类型,然后分别实现了Master和Worker类,并使用ActorSystem来创建和管理它们。我们还使用ActorRef、ActorSelection、Props等对象来发送和接收消息。我们模拟了Worker向Master注册、Master回复注册成功、Worker发送心跳、Master更新状态信息、Master检查Worker是否超时等步骤。Akka是一个强大的框架,可以帮助我们构建高并发、分布式和容错的系统。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhkcji
系列文章
更多 icon
同类精品
更多 icon
继续加载