【变量】spark(六)变量共享:累加器和广播变量

导读:本篇文章讲解 【变量】spark(六)变量共享:累加器和广播变量,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

一. 累加器:

1.概述

获取全局唯一状态的对象。Driver和Executor之间共享变量,即Executor的计算结果可以返回到Driver中继续使用。全局级别:无论是哪个Task,修改的都是此累加器。

使用:
在这里插入图片描述

对于Executor来说,只能修改(增加)累加器的内容,不能读,只对Driver可读。
累加器是全局唯一的,每次操作只增不减。

 

2. 自定义累加器

case class PersonInfo(var personNum: Int, var ageSum: Int)

class MyAccumulator extends AccumulatorV2[PersonInfo, PersonInfo] {
  //给Driver端的初始值,累加器是在此基础上进行累加的
  private var info = PersonInfo(0, 0)

  //每个分区的累加器是否有初始值,要和reset方法保持一致
  override def isZero: Boolean = info.personNum == 0 && info.ageSum == 0

  //复制一个新的累加器
  override def copy(): AccumulatorV2[PersonInfo, PersonInfo] = {
    val acc = new MyAccumulator()
    acc.info = info
    acc
  }

  //reset初始值
  override def reset(): Unit = {
    info=PersonInfo(0,0)
  }

  //(各RDD分区内各自)累加
  override def add(v: PersonInfo): Unit = {
    info.personNum += v.personNum
    info.ageSum += v.ageSum
  }

  //(将每个分区的结果,)合并到Driver端
  override def merge(other: AccumulatorV2[PersonInfo, PersonInfo]): Unit = {
    val ac = other.asInstanceOf[MyAccumulator]
    info.personNum += ac.info.personNum
    info.ageSum += ac.info.ageSum
  }

  //累加器在Driver端最终返回的结果
  override def value: PersonInfo = info
}

object SelfAccumulatorTest {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("acc")
    val sc = new SparkContext(conf)
    val acc = new MyAccumulator()
    sc.register(acc)

    val list = sc.parallelize(Array[String]("a 1", "b 2", "c 3", "d 4", "e 5", "f 6"), 3)
    list.map(line => {
      val age = line.split(" ")(1).toInt
      acc.add(PersonInfo(1, age))
      line
    }).collect()
    println(s"结果:${acc.value}")

  }

 

二. 广播变量

分布式只读共享变量

1. 概述

背景:

每个Task在运行时都会拷贝一份数据副本,更好的保证了数据一致性,但是消耗大量内存,当数据量很大时,极易出现内存溢出(OOM)。

概念:

广播变量允许保留一个只读的变量缓存在每台机器上,而不是每个task上。
广播变量是由Driver发给当前Application分配的所有Executor(进程节点)内存级别的全局只读变量,Executor中线程池的线程共享该全局变量。
变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

优点:

减少了网络传输(否则的话每个Task都要传输一次该变量,极大地节省了内存,提高CPU地有效工作。

使用

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)

在这里插入图片描述

 
 
参考:
Spark自定义累加器

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/65341.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!