• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

星火广播杰克逊ObjectMapper

用户头像
it1352
帮助6

问题说明

我有一个火花应用程序,它读取一个文件的线条和尝试使用杰克逊反序列化它们。
为了得到这个code的工作,我需要定义Map操作(否则我得到一个NullPointerException异常)。

I have a spark application which reads lines from a files and tries to deserialize them using jackson. To get this code to work, I needed to define the ObjectMapper inside the Map operation (otherwise I got a NullPointerException).

我有以下的code这是工作的:

I have the following code which is working:

val alertsData = sc.textFile(rawlines).map(alertStr => {
      val mapper = new ObjectMapper()
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

不过,如果我定义映射器的地图之外,并播放它时,出现一个NullPointerException异常。

However, If I define the mapper outside the map and broadcast it, it fails with a NullPointerException.

这code失败:

val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    val broadcastVar = sc.broadcast(mapper)

    val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

我是什么在这里失踪?

What am I missing here?

谢谢,
阿里扎

Thanks, Aliza

正确答案

#1

原来你的可以的广播映射器。有问题的部分是 mapper.registerModule(DefaultScalaModule)这就需要将执行每个从站(执行器)的机器上,而不是仅在驱动程序。

It turns out you can broadcast the mapper. The problematic part was mapper.registerModule(DefaultScalaModule) which needs to be execute on each slave (executor) machine and not only on the driver.

所以这code工作:

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)

val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
})

我运行registerModule每个分区只有一次进一步优化了code(而不是在每个RDD元素)。

I further optimised the code by running registerModule only once per partition (and not for each element in the RDD).

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)

val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
      for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })

阿里扎

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /reply/detail/tanhcakkcf
系列文章
更多 icon
同类精品
更多 icon
继续加载