博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkRdd-scala版本
阅读量:3963 次
发布时间:2019-05-24

本文共 3591 字,大约阅读时间需要 11 分钟。

需知

介绍一下什么是rdd;rdd就是一个容器;List,set,map也是容器,区别:list,set,map是单机的,把数据放到内存中,内存有上限;rdd:里面存储的数据无上限;(因为它是集群,把所有的电脑的内存都连接起来)

容器里面的方法(CRUD);
Sql语句:也是操作容器里面的数据,而且功能特别强大;
所有的功能都集于rdd一身;
方法(算子);stream编程;

为什么要学习rdd

List,set,map是单机的,把数据放到内存中,内存有上限;

Mr:但是它比较慢;

Rdd

Rdd:无上限的容器,分布式容器

Rdd:(弹性分布式数据集)–>resilient distributed dataset
它是可以并行(多线程)操作的元素的容错(有错误可以修复)集合(容器)

五大特性:

A list of partitions:(分区列表)

A function for computing each split(计算每个分割的函数)
A list of dependencies on other RDDs;(对其他RDD的依赖关系列表)
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)(可选地,key - value RDD的分区器(例如,说RDD是散列分区的))
Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file)(可选地,要计算每个拆分的首选位置列表(例如一个HDFS文件)

Rdd操作

操作分为两类

Transformations(传输,转换)算子;返回值还是rdd(Stream)
Action(行为,行动);算子;;返回值还是非rdd(非Stream)
Transformations算子是懒惰的,必须得有一个action算子才能执行(数据流里面action算子只能调用一次,在spark里面可以调用多次)
Spark大哥和小弟之间传输的是代码,而不是数据(因为数据太大)
闭包不建议对闭包外面的变量进行修改(可以使用)

API操作

前期准备

/* java连接Hbase */      System.setProperty("hadoop.home.dir", "E:/帮助文档/大数据/hadoop-3.2.1");      /* Spark的程序;我们这是在Spark的基础之上进行二次开发 */      /* === SparkContext === */      /* 创建SparkConf       * bin/spark-submit 后面可以跟上一些参数       *  */      var conf = new SparkConf();      /* 设置一个大哥 */      conf.setMaster("local[*]");      conf.setAppName("MyHw");      /* 参数是SparkConf */      var sc = new SparkContext(conf);      /* 调用方法 */      //makeRdd1(sc);            /* 根据外部系统的文件集创建rdd */      makeRdd2(sc);            /* 一个jvm里面只能一个sc;当运行完以后,要调用停止的方法 */      sc.stop()
var arr = Array(10,20,15,30,50,60);    /* 创建一个rdd     * 参数1:Seq:List,Set,Map,数组     *  */    var arrRdd = sc.parallelize(arr);    /* 查看这个rdd里面的元素 */    var collectArr = arrRdd.collect() ;     println("collectArr:" + Arrays.toString(collectArr));    /* 获取元素 */    arrRdd.foreach( t => println("===foreach循环==" + t ));    /* 简化 */    arrRdd.foreach( println(_) );    println("===分区===");    /* 创建rdd的时候可以有多个分区(一个分区一个任务)     * 参数2:是分区     *  */    var arrParaRdd = sc.parallelize(arr, 2);    /* 查看元素 */    collectArr = arrParaRdd.collect() ;      println("collectArr:" + Arrays.toString(collectArr));    /* 获取元素 */    arrParaRdd.foreach( t => println("===foreach循环==" + t ));    /* 要想获取这个集合中的数据如何分区 */    arrParaRdd.foreachPartition( t =>    {      /* 查看类型 */      println("==foreachPartition==" + t.getClass.getSimpleName );      t.foreach( t1 =>       {        print("===循环==>" + t1 + "\t");       });      println("==结束===");    });     /* 查看分区数 */    println("==getNumPartitions===" + arrParaRdd.getNumPartitions) ;   }    /**   * 创建rdd   */  def makeRdd2(sc:SparkContext) : Unit =  {    println("==makeRdd2===");    /* 文件路径:绝对路径     * 默认是本地路径:file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data.txt     * 		如果是集群,保证这个文件在所有的集群上都要有一份;     * 		这里面的路径支持*通配符     * SequenceFiles:这个是hadoop,hdfs支持的文件格式     * hdfs:支持上hdfs的路径     * 保存     *  */    var path = "file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data.txt" ;     /* 参数1:文件的路径     * 参数2:最小的分区数     * 返回值是一个rdd:     * 		文件第一行当做rdd里面的元素     *  */    var textFileRdd = sc.textFile(path,3);    println("==textFileRdd==" + textFileRdd);    println("==getNumPartitions===" + textFileRdd.getNumPartitions) ;    /* 收集元素 */    var collectArr = textFileRdd.collect() ;     println("collectArr:" + collectArr);    /* 获取元素 */    textFileRdd.foreach( t => println("===foreach循环==" + t ));    /* 这个路径是一个目录,而不是文件夹 */    var tarPath = "file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data_res.txt"		/* 将rdd存储到磁盘上,存储的是文本文件 */    textFileRdd.saveAsTextFile(tarPath);  }}

转载地址:http://rgrzi.baihongyu.com/

你可能感兴趣的文章
SQL - SQL Server查询近7天的连续日期
查看>>
SQL - SQL Server中如何取年、月、日 -DATEPART函数
查看>>
SQL - SQL Server 一列或多列重复数据的查询,删除
查看>>
NET - .NET Core WebAPI + Vue + Axios 导出Excel / CSV
查看>>
NET - NET Core Quartz.net开源作业调度框架使用详解
查看>>
NET - NET Core quartz.net 时间表达式----- Cron表达式详解
查看>>
NET - .NET Core 之 Abp Audit-Logging
查看>>
NET - .NET Core 之 Abp AuditLog 将不同的Controller实体的审计日志存储到不同的Table
查看>>
NET - .NET Core 之 Azure Key Vault 密钥保管库的使用
查看>>
NET - .NET Core 之 Abp 整合 Quartz
查看>>
Docker - Docker Desktop(WSL2)修改镜像存储位置
查看>>
NET - NET Core使用Log4net的SqlServer AdoNetAppender 报程序集错误
查看>>
NET - NET Core中使用Log4net输出日志到数据库中去
查看>>
NET - NET Core 迁移nuget包缓存到指定位置
查看>>
Spring - SpringBoot 集成 swagger2
查看>>
SQL - 深入理解MySQL索引之B+Tree
查看>>
SQL - 数据库索引原理,及MySQL索引类型
查看>>
Spring - Dubbo的实现原理
查看>>
Spring - Dubbo 扩展点详解
查看>>
Spring - Hystrix原理与实战
查看>>