什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
环境准备
创建一个java的maven项目。
添加scala支持。
在pom文件中添加spark-core的依赖和scala的编译插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.3</version> </dependency> </dependencies>
<build> <finalName>SparkCoreTest</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
|
RDD创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
从集合中创建RDD:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package cn.kittybunny
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object Test01 { def main(args: Array[String]): Unit = { // 1.创建sc的配置对象 val conf: SparkConf = new SparkConf().setAppName("sparkCore").setMaster("local[*]")
// 2. 创建sc对象 val sc = new SparkContext(conf)
// 3. 编写任务代码 val list = List(1, 2, 3, 4)
// 从集合创建rdd val intRDD: RDD[Int] = sc.parallelize(list) intRDD.collect().foreach(println)
// 底层调用parallelize 推荐使用 比较好记 val intRDD1: RDD[Int] = sc.makeRDD(list) intRDD1.collect().foreach(println)
// 4.关闭sc sc.stop() }
}
|
从文件获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| // 1.创建sc的配置对象 val conf: SparkConf = new SparkConf() .setAppName("sparkCore").setMaster("local[*]")
// 2. 创建sc对象 val sc = new SparkContext(conf)
// 3. 编写任务代码 // 不管文件中存的是什么数据 读取过来全部当做字符串处理 val lineRDD: RDD[String] = sc.textFile("input/1.txt")
lineRDD.collect().foreach(println)
// 4.关闭sc sc.stop()
|
分区规则
Action行动算子
RDD序列化
RDD依赖关系
RDD持久化
键值对RDD数据分区
累加器
广播变量
样例