5.1.3惰性机制 所谓的“惰性机制”是指,整个转换过程只是记录了转换的轨迹,并不会发生真 正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算 这里给出一段简单的语句来解释 Spark的惰性机制 scala> val lines sc textFile(data. txt) scala> val lineLengths lines map(s=>slength) scala> val totalLength= lineLengths reduce((a, b)=>a+b) Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn
《Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn 5.1.3 惰性机制 scala> val lines = sc.textFile("data.txt") scala> val lineLengths = lines.map(s => s.length) scala> val totalLength = lineLengths.reduce((a, b) => a + b) 所谓的“惰性机制”是指,整个转换过程只是记录了转换的轨迹,并不会发生真 正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算 这里给出一段简单的语句来解释Spark的惰性机制
5.14持久化 在 Spark中,RDD釆用惰性求值的机制,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据 下面就是多次计算同一个RDD的例子: scala> val list List("Hadoop","Spark","Hive") list: List[ String]= List(Hadoop, Spark, Hive) scala> val rdd= sc parallelize (list) rdd: org. apache. spark rdd RDD[String]= ParallelCollectionRDD[22] at parallelize at <console>: 29 scala> printIn(rdd. counto)∥行动操作,触发一次真正从头到尾的计算 scaa> printin( rdd collect0. mkString(",")∥行动操作,触发一次真正从头到尾 的计算 Hadoop, Spark, Hive Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn
《Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn 5.1.4 持久化 在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据 下面就是多次计算同一个RDD的例子: scala> val list = List("Hadoop","Spark","Hive") list: List[String] = List(Hadoop, Spark, Hive) scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29 scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算 3 scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾 的计算 Hadoop,Spark,Hive
5.1.3持久化 ·可以通过持久化(缓存)机制避免这种重复计算的开销 可以使用 persist方法对一个RDD标记为持久化 之所以说“标记为持久化”,是因为出现 persist语句的地 方,并不会马上计算生成RDD并把它持久化,而是要等到 遇到第一个行动操作触发真正计算以后,才会把计算结果进 行持久化 持久化后的RDD将会被保留在计算节点的内存中被后面的 行动操作重复使用 Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn
《Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn 5.1.3 持久化 •可以通过持久化(缓存)机制避免这种重复计算的开销 •可以使用persist()方法对一个RDD标记为持久化 •之所以说“标记为持久化”,是因为出现persist()语句的地 方,并不会马上计算生成RDD并把它持久化,而是要等到 遇到第一个行动操作触发真正计算以后,才会把计算结果进 行持久化 •持久化后的RDD将会被保留在计算节点的内存中被后面的 行动操作重复使用
5.1.3持久化 persist的圆括号中包含的是持久化级别参数: persist( MEMORY ONLY):表示将RDD作为反序列化的对 象存储于JⅥM中,如果内存不足,就要按照LRU原则替换 缓存中的内容 persist( MEMORY AND_DSK表示将RDD作为反序列化 的对象存储在JVM中,如果内存不足,超出的分区将会被 存放在硬盘上 般而言,使用 cache方法时,会调用 persist(MEMORY ONLY) 可以使用 unpersist()方法手动地把持久化的RDD从缓存中 移除 Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn
《Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn 5.1.3 持久化 persist()的圆括号中包含的是持久化级别参数: •可以使用unpersist()方法手动地把持久化的RDD从缓存中 移除 •persist(MEMORY_ONLY):表示将RDD作为反序列化的对 象存储于JVM中,如果内存不足,就要按照LRU原则替换 缓存中的内容 •persist(MEMORY_AND_DISK)表示将RDD作为反序列化 的对象存储在JVM中,如果内存不足,超出的分区将会被 存放在硬盘上 •一般而言,使用cache()方法时,会调用 persist(MEMORY_ONLY)
5.1.3持久化 针对上面的实例,增加持久化语句以后的执行过程如下: scala> val list= List(Hadoop","Spark","Hive") list: List[ String]=List(Hadoop, Spark, Hive) scala> val rdd=Sc parallelize list) rdd: org. apache. spark rdd RDD[String ] Parallel Collection RDD[22] at parallelize at <console>: 29 scala> rdd cache∥会调用 persist( MEMORY ONLY),但是,语句执行到这 里,并不会缓存rdd,因为这时rdd还没有被计算生成 scala> println( rdd. count(0)∥第一次行动操作,触发一次真正从头到尾的计算, 这时上面的 Idd cacheO才会被执行,把这个rdd放到缓存中 scala> println(rdd. collecto. mk String(")∥第二次行动操作,不需要触发从头 到尾的计算,只需要重复使用上面缓存中的rdd Hadoop, Spark, Hive Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn
《Spark编程基础》 厦门大学计算机科学系 林子雨 ziyulin@xmu.edu.cn 5.1.3 持久化 scala> val list = List("Hadoop","Spark","Hive") list: List[String] = List(Hadoop, Spark, Hive) scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29 scala> rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这 里,并不会缓存rdd,因为这时rdd还没有被计算生成 scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算, 这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中 3 scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头 到尾的计算,只需要重复使用上面缓存中的rdd Hadoop,Spark,Hive 针对上面的实例,增加持久化语句以后的执行过程如下: