本文共 3591 字,大约阅读时间需要 11 分钟。
介绍一下什么是rdd;rdd就是一个容器;List,set,map也是容器,区别:list,set,map是单机的,把数据放到内存中,内存有上限;rdd:里面存储的数据无上限;(因为它是集群,把所有的电脑的内存都连接起来)
容器里面的方法(CRUD); Sql语句:也是操作容器里面的数据,而且功能特别强大; 所有的功能都集于rdd一身; 方法(算子);stream编程;List,set,map是单机的,把数据放到内存中,内存有上限;
Mr:但是它比较慢;Rdd:无上限的容器,分布式容器
Rdd:(弹性分布式数据集)–>resilient distributed dataset 它是可以并行(多线程)操作的元素的容错(有错误可以修复)集合(容器)A list of partitions:(分区列表)
A function for computing each split(计算每个分割的函数) A list of dependencies on other RDDs;(对其他RDD的依赖关系列表) Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)(可选地,key - value RDD的分区器(例如,说RDD是散列分区的)) Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file)(可选地,要计算每个拆分的首选位置列表(例如一个HDFS文件)操作分为两类
Transformations(传输,转换)算子;返回值还是rdd(Stream) Action(行为,行动);算子;;返回值还是非rdd(非Stream) Transformations算子是懒惰的,必须得有一个action算子才能执行(数据流里面action算子只能调用一次,在spark里面可以调用多次) Spark大哥和小弟之间传输的是代码,而不是数据(因为数据太大) 闭包不建议对闭包外面的变量进行修改(可以使用)/* java连接Hbase */ System.setProperty("hadoop.home.dir", "E:/帮助文档/大数据/hadoop-3.2.1"); /* Spark的程序;我们这是在Spark的基础之上进行二次开发 */ /* === SparkContext === */ /* 创建SparkConf * bin/spark-submit 后面可以跟上一些参数 * */ var conf = new SparkConf(); /* 设置一个大哥 */ conf.setMaster("local[*]"); conf.setAppName("MyHw"); /* 参数是SparkConf */ var sc = new SparkContext(conf); /* 调用方法 */ //makeRdd1(sc); /* 根据外部系统的文件集创建rdd */ makeRdd2(sc); /* 一个jvm里面只能一个sc;当运行完以后,要调用停止的方法 */ sc.stop()
var arr = Array(10,20,15,30,50,60); /* 创建一个rdd * 参数1:Seq:List,Set,Map,数组 * */ var arrRdd = sc.parallelize(arr); /* 查看这个rdd里面的元素 */ var collectArr = arrRdd.collect() ; println("collectArr:" + Arrays.toString(collectArr)); /* 获取元素 */ arrRdd.foreach( t => println("===foreach循环==" + t )); /* 简化 */ arrRdd.foreach( println(_) ); println("===分区==="); /* 创建rdd的时候可以有多个分区(一个分区一个任务) * 参数2:是分区 * */ var arrParaRdd = sc.parallelize(arr, 2); /* 查看元素 */ collectArr = arrParaRdd.collect() ; println("collectArr:" + Arrays.toString(collectArr)); /* 获取元素 */ arrParaRdd.foreach( t => println("===foreach循环==" + t )); /* 要想获取这个集合中的数据如何分区 */ arrParaRdd.foreachPartition( t => { /* 查看类型 */ println("==foreachPartition==" + t.getClass.getSimpleName ); t.foreach( t1 => { print("===循环==>" + t1 + "\t"); }); println("==结束==="); }); /* 查看分区数 */ println("==getNumPartitions===" + arrParaRdd.getNumPartitions) ; } /** * 创建rdd */ def makeRdd2(sc:SparkContext) : Unit = { println("==makeRdd2==="); /* 文件路径:绝对路径 * 默认是本地路径:file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data.txt * 如果是集群,保证这个文件在所有的集群上都要有一份; * 这里面的路径支持*通配符 * SequenceFiles:这个是hadoop,hdfs支持的文件格式 * hdfs:支持上hdfs的路径 * 保存 * */ var path = "file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data.txt" ; /* 参数1:文件的路径 * 参数2:最小的分区数 * 返回值是一个rdd: * 文件第一行当做rdd里面的元素 * */ var textFileRdd = sc.textFile(path,3); println("==textFileRdd==" + textFileRdd); println("==getNumPartitions===" + textFileRdd.getNumPartitions) ; /* 收集元素 */ var collectArr = textFileRdd.collect() ; println("collectArr:" + collectArr); /* 获取元素 */ textFileRdd.foreach( t => println("===foreach循环==" + t )); /* 这个路径是一个目录,而不是文件夹 */ var tarPath = "file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data_res.txt" /* 将rdd存储到磁盘上,存储的是文本文件 */ textFileRdd.saveAsTextFile(tarPath); }}
转载地址:http://rgrzi.baihongyu.com/