• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spark 从入门到精通

武飞扬头像
李昊哲小课
帮助1

Spark 从入门到精通

环境搭建

准备工作

创建安装目录
mkdir /opt/soft
cd /opt/soft
下载scala
wget https://downloads.lightbend.com/scala/2.13.10/scala-2.13.10.tgz -P /opt/soft
解压scala
tar -zxvf scala-2.13.10.tgz
修改scala目录名称
mv scala-2.13.10 scala-2
下载spark
wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz -P /opt/soft
解压spark
tar -zxvf spark-3.4.0-bin-hadoop3-scala2.13.tgz 
修改目录名称
mv spark-3.4.0-bin-hadoop3-scala2.13 spark3
修改环境遍历
vim /etc/profile
export JAVA_HOME=/opt/soft/jdk8
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export ZOOKEEPER_HOME=/opt/soft/zookeeper

export HADOOP_HOME=/opt/soft/hadoop3

export HADOOP_INSTALL=${HADOOP_HOME}
export HADOOP_MAPRED_HOME=${HADOOP_HOME}
export HADOOP_COMMON_HOME=${HADOOP_HOME}
export HADOOP_HDFS_HOME=${HADOOP_HOME}
export YARN_HOME=${HADOOP_HOME}
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

export HIVE_HOME=/opt/soft/hive3
export HCAT_HOME=/opt/soft/hive3/hcatalog

export SQOOP_HOME=/opt/soft/sqoop-1

export FLUME_HOME=/opt/soft/flume

export HBASE_HOME=/opt/soft/hbase2

export PHOENIX_HOME=/opt/soft/phoenix

export SCALA_HOME=/opt/soft/scala-2

export SPARK_HOME=/opt/soft/spark3
export SPARKPYTHON=/opt/soft/spark3/python

export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$SQOOP_HOME/bin:$FLUME_HOME/bin:$HBASE_HOME/bin:$PHOENIX_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SPARKPYTHON
学新通
source /etc/profile

Local模式

scala java
启动
spark-shell

学新通

页面地址:http://spark01:4040

![sparkl local spark-shell

退出
:quit

学新通

pyspark
启动
pyspark

学新通

页面地址:http://spark01:4040

学新通

退出
quit() or Ctrl-D

学新通

本地模式提交应用

在spark目录下执行

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
  1. –class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
  2. –master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
  3. spark-examples_2.13-3.4.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
  4. 数字10表示程序的入口参数,用于设定当前应用的任务数量

Standalone模式

编写核心配置文件

cont目录下

cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native

export SPARK_MASTER_HOST=spark01
export SPARK_MASTER_PORT=7077

export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633

编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03

配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui

http://spark01:6633

学新通

http://spark01:18080

学新通

提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10

学新通

学新通

HA模式

编写核心配置文件

cont目录下

cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native

SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=spark01:2181,spark02:2181,spark03:2181 
-Dspark.deploy.zookeeper.dir=/spark3"

export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633

hdfs dfs -mkdir /spark3
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03

配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui

http://spark01:6633

学新通

http://spark01:18080

学新通

提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
提交作业到Yarn
bin/spark-submit --master yarn \
--class  org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.13-3.4.0.jar 10

学新通

学新通

spark-code

spark-core

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.lihaozhe</groupId>
  <artifactId>spark-code</artifactId>
  <version>1.0.0</version>

  <properties>
    <jdk.version>1.8</jdk.version>
    <scala.version>2.13.10</scala.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>

    <commons-lang3.version>3.12.0</commons-lang3.version>
    <java-testdata-generator.version>1.1.2</java-testdata-generator.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>com.github.binarywang</groupId>
      <artifactId>java-testdata-generator</artifactId>
      <version>${java-testdata-generator.version}</version>
    </dependency>
    <!-- spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.13</artifactId>
      <version>3.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.13</artifactId>
      <version>3.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.4.0</version>
    </dependency>
    <!-- junit-jupiter-api -->
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-api</artifactId>
      <version>5.9.3</version>
      <scope>test</scope>
    </dependency>
    <!-- junit-jupiter-engine -->
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <version>5.9.3</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.26</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.14.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.5</version>
    </dependency>
    <!-- commons-pool2 -->
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.11.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.13</artifactId>
      <version>3.4.0</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.33</version>
    </dependency>
    <!-- commons-lang3 -->
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>${commons-lang3.version}</version>
    </dependency>
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.11.0</version>
    </dependency>
  </dependencies>
  <build>
    <finalName>${project.artifactId}</finalName>
    <!--<outputDirectory>../package</outputDirectory>-->
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.11.0</version>
        <configuration>
          <!-- 设置编译字符编码 -->
          <encoding>UTF-8</encoding>
          <!-- 设置编译jdk版本 -->
          <source>${jdk.version}</source>
          <target>${jdk.version}</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-clean-plugin</artifactId>
        <version>3.2.0</version>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-resources-plugin</artifactId>
        <version>3.3.1</version>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-war-plugin</artifactId>
        <version>3.3.2</version>
      </plugin>
      <!-- 编译级别 -->
      <!-- 打包的时候跳过测试junit begin -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.22.2</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <!-- 该插件用于将Scala代码编译成class文件 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>4.8.1</version>
        <configuration>
          <scalaCompatVersion>2.13</scalaCompatVersion>
          <scalaVersion>2.13.10</scalaVersion>
        </configuration>
        <executions>
          <execution>
            <goals>
              <goal>testCompile</goal>
            </goals>
          </execution>
          <execution>
            <id>compile-scala</id>
            <phase>compile</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>test-compile-scala</id>
            <phase>test-compile</phase>
            <goals>
              <goal>add-source</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.5.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

学新通
hdfs-conf

在 resources 目录下存放 hdfs 核心配置文件 core-site.xml 和hdfs-site.xml

被引入的hdfs配置文件为测试集群配置文件

由于生产环境与测试环境不同,项目打包的时候排除hdfs配置文件

rdd
数据集方式构建RDD
package com.lihaozhe.course01;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * 借助并行数据集 Parallelized Collections 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo01 {
    public static void main(String[] args) {
        String appName = "RDD";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 数据集
            List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
            // 从集合中创建 RDD
            // Parallelized Collections
            JavaRDD<Integer> distData = sc.parallelize(data);
            // 将数据获取到本地driver
            List<Integer> result = distData.collect();
            // lambda 表达式
            result.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course01

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 借助并行数据集 Parallelized Collections 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo01 {
  def main(args: Array[String]): Unit = {
    val appName = "rdd"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 数据集
    val data = Array(1, 2, 3, 4, 5)
    // 从集合中创建 RDD
    // Parallelized Collections
    val distData = sc.parallelize(data)
    distData.foreach(println)
  }
}

学新通
本地文件构建RDD

words.txt

linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
base phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto

学新通
package com.lihaozhe.course01;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo02 {
    public static void main(String[] args) {
        String appName = "RDD";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从外部中创建 RDD
            // External Datasets
            // 使用本地文件系统
            JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt");
            // 将数据获取到本地driver
            List<String> lines = javaRDD.collect();
            // lambda 表达式
            lines.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course01

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo02 {
  def main(args: Array[String]): Unit = {
    val appName = "rdd"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distFile = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt")
    distFile.foreach(println)
  }
}

学新通
HDFS文件构建RDD
package com.lihaozhe.course01;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;

/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo03 {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "RDD";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从外部中创建 RDD
            // External Datasets
            // 使用本地文件系统
            // JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
            JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");
            // 将数据获取到本地driver
            List<String> lines = javaRDD.collect();
            // lambda 表达式
            lines.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course01

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo03 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val appName = "rdd"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用HDFS文件系统
    // val distFile = sc.textFile("hdfs://spark01:8020/data/words.txt")
    val distFile = sc.textFile("/data/words.txt")
    distFile.foreach(println)
  }
}

学新通
转换算子与行动算子

data.csv

person3,137
person7,193
person7,78
person0,170
person5,145
person5,54
person5,150
person0,102
person0,15
person8,172
person6,177
person5,158
person8,30
person6,184
person5,50
person4,127
person1,197
person3,99
person7,2
person7,51
person9,27
person6,34
person0,18
person7,111
person2,34
person0,80
person3,19
person8,121
person1,38
person3,37
person8,69
person3,116
person5,14
person4,121
person7,13
person8,10
person4,67
person6,177
person8,161
person6,113
person5,161
person3,159
person5,161
person2,88
person3,191
person0,155
person4,55
person6,153
person6,187
person0,41
person3,157
person4,179
person4,95
person1,12
person3,109
person9,24
person9,188
person1,114
person7,9
person7,82
person8,47
person9,153
person7,152
person6,110
person2,73
person8,132
person4,175
person7,153
person9,174
person3,23
person3,103
person9,169
person8,98
person6,62
person2,33
person3,127
person1,91
person6,198
person4,28
person1,182
person0,164
person5,198
person7,22
person0,46
person3,5
person8,140
person3,131
person4,195
person7,86
person0,137
person8,152
person8,154
person7,144
person5,142
person9,147
person1,29
person5,113
person6,173
person6,115
person9,148
person2,114
person7,69
person6,192
person0,113
person5,26
person3,7
person1,2
person6,60
person8,38
person6,19
person4,5
person3,50
person9,179
person2,148
person0,23
person3,121
person9,66
person9,90
person4,166
person7,199
person0,79
person2,157
person5,98
person6,25
person1,100
person4,184
person6,124
person4,183
person3,105
person6,28
person5,141
person6,60
person2,108
person5,171
person7,98
person2,57
person9,18
person8,35
person7,141
person0,180
person2,176
person9,130
person2,26
person0,81
person6,144
person3,33
person4,41
person9,60
person1,99
person4,115
person6,83
person2,90
person7,174
person8,47
person5,62
person0,119
person9,99
person3,125
person3,20
person1,137
person9,74
person6,1
person4,140
person4,122
person1,56
person7,107
person9,131
person7,174
person7,191
person8,31
person4,45
person9,84
person6,38
person9,186
person6,89
person5,87
person9,80
person5,107
person3,175
person8,44
person0,114
person7,63
person3,129
person9,77
person9,86
person9,183
person3,61
person4,104
person2,192
person5,142
person4,124
person5,76
person0,187
person3,38
person7,62
person5,153
person9,149
person7,87
person7,27
person6,88

学新通
count
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * count 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo01 {
    public static void main(String[] args) {
        String appName = "count";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 数据集
            List<Integer> data = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
            // 从集合中创建 RDD
            // Parallelized Collections
            JavaRDD<Integer> distData = sc.parallelize(data);
            long count = distData.count();
            System.out.println("count = "   count);
        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * count 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo01 {
  def main(args: Array[String]): Unit = {
    val appName = "count"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 数据集
    val data = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    // 从集合中创建 RDD
    // Parallelized Collections
    val distData = sc.parallelize(data)
    val count = distData.count()
    println(s"count = ${count}")
  }
}

学新通
take
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * take 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo02 {
    public static void main(String[] args) {
        String appName = "take";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 数据集
            List<Integer> data = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
            // 从集合中创建 RDD
            // Parallelized Collections
            JavaRDD<Integer> distData = sc.parallelize(data);
            List<Integer> topList = distData.take(3);
            topList.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * take 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo02 {
  def main(args: Array[String]): Unit = {
    val appName = "take"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 数据集
    val data = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    // 从集合中创建 RDD
    // Parallelized Collections
    val distData = sc.parallelize(data)
    val top = distData.take(3)
    top.foreach(println)
  }
}

学新通
distinct
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * distinct 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo03 {
    public static void main(String[] args) {
        String appName = "distinct";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 数据集
            List<Integer> data = Arrays.asList(0, 1, 5, 6, 7, 8, 9, 3, 4, 2, 4, 3);
            // 从集合中创建 RDD
            // Parallelized Collections
            JavaRDD<Integer> distData = sc.parallelize(data);
            JavaRDD<Integer> uniqueRDD = distData.distinct();
            List<Integer> uniqueList = uniqueRDD.collect();
            uniqueList.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * distinct 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo03 {
  def main(args: Array[String]): Unit = {
    val appName = "distinct"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 数据集
    val data = Array(0, 1, 5, 6, 7, 8, 9, 3, 4, 2, 4, 3)
    // 从集合中创建 RDD
    // Parallelized Collections
    val distData = sc.parallelize(data)
    val uniqueData = distData.distinct()
    uniqueData.foreach(println)
  }
}

学新通
map
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * map 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo04 {
    public static void main(String[] args) {
        String appName = "map";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 数据集
            List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
            // 从集合中创建 RDD
            // Parallelized Collections
            JavaRDD<Integer> distData = sc.parallelize(data);
            JavaRDD<Integer> rs = distData.map(num -> num * 2);
            List<Integer> list = rs.collect();
            list.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * map 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo04 {
  def main(args: Array[String]): Unit = {
    val appName = "map"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 数据集
    val data = Array(1, 2, 3, 4, 5)
    // 从集合中创建 RDD
    // Parallelized Collections
    val distData = sc.parallelize(data)
    val rs = distData.map(_ * 2)
    rs.foreach(println)
  }
}

学新通
flatMap
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * flatMap 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo05 {
    public static void main(String[] args) {
        String appName = "flatMap";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 数据集
            List<String> data = Arrays.asList("hadoop hive presto","hbase phoenix","spark flink");
            // 从集合中创建 RDD
            // Parallelized Collections
            JavaRDD<String> javaRDD = sc.parallelize(data);
            // javaRDD.flatMap(new FlatMapFunction<String, Object>() {
            //     @Override
            //     public Iterator<Object> call(String s) throws Exception {
            //        return null;
            //    }
            // });
            JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).listIterator());
            List<String> words = wordsRdd.collect();
            words.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * flatMap 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo05 {
  def main(args: Array[String]): Unit = {
    val appName = "flatMap"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 数据集
    val data = Array("hadoop hive presto","hbase phoenix","spark flink")
    // 从集合中创建 RDD
    // Parallelized Collections
    // ("hadoop","hive","presto","hbase","phoenix","spark","flink")
    val rs = data.flatMap(_.split(" "))
    rs.foreach(println)
  }
}

学新通
filter
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * filter 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo06 {
    public static void main(String[] args) {
        String appName = "filter";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 数据集
            List<Integer> data = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
            // 从集合中创建 RDD
            // Parallelized Collections
            JavaRDD<Integer> distData = sc.parallelize(data);
            JavaRDD<Integer> evenRDD = distData.filter(num -> num % 2 == 0);
            List<Integer> evenList = evenRDD.collect();
            evenList.forEach(System.out::println);
        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * filter 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo06 {
  def main(args: Array[String]): Unit = {
    val appName = "filter"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 数据集
    val data = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    // 从集合中创建 RDD
    // Parallelized Collections
    val distData = sc.parallelize(data)
    val evenData = distData.filter(_ % 2 == 0)
    evenData.foreach(println)
  }
}

学新通
groupByKey
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.List;

/**
 * groupByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo07 {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "groupByKey";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从外部中创建 RDD
            // External Datasets
            // 使用本地文件系统
            // JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
            JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
            // javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            //     @Override
            //     public Tuple2<String, Integer> call(String s) throws Exception {
            //         return null;
            //    }
            // });
            JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
                String[] words = word.split(",");
                return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
            });
            JavaPairRDD<String, Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();
            List<Tuple2<String, Iterable<Integer>>> collect = groupRDD.collect();
            collect.forEach(tup -> {
                System.out.print(tup._1   " >>> (");
                tup._2.forEach(num -> System.out.print(num   ","));
                System.out.println("\b)");
            });

        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * groupByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo07 {
  def main(args: Array[String]): Unit = {
    val appName = "groupByKey"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1)))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val groupData = tupleData.groupByKey()
    groupData.foreach(println)
  }
}

学新通
reduceByKey
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.List;

/**
 * reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo08 {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "reduceByKey";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从外部中创建 RDD
            // External Datasets
            // 使用本地文件系统
            // JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
            JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
            JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
                String[] words = word.split(",");
                return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
            });
            // JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            //     @Override
            //     public Integer call(Integer integer, Integer integer2) throws Exception {
            //         return integer   integer2;
            //     }
            // });
            JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
            List<Tuple2<String, Integer>> collect = reduceRDD.collect();
            collect.forEach(tup -> System.out.println(tup._1   " >>> "   tup._2));
        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo08 {
  def main(args: Array[String]): Unit = {
    val appName = "reduceByKey"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val sumData = tupleData.reduceByKey(_   _)
    sumData.foreach(println)
  }
}

学新通
mapValues
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * mapValues 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo09 {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "mapValues";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从外部中创建 RDD
            // External Datasets
            // 使用本地文件系统
            // JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
            JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
            // javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            //     @Override
            //     public Tuple2<String, Integer> call(String s) throws Exception {
            //         return null;
            //    }
            // });
            JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
                String[] words = word.split(",");
                return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
            });
            JavaPairRDD<String, Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();
            JavaPairRDD<String, Double> avgRDD = groupRDD.mapValues(v -> {
                int sum = 0;
                Iterator<Integer> it = v.iterator();
                AtomicInteger atomicInteger = new AtomicInteger();
                while (it.hasNext()) {
                    Integer amount = it.next();
                    sum  = amount;
                    atomicInteger.incrementAndGet();
                }
                return (double) sum / atomicInteger.get();
            });
            List<Tuple2<String, Double>> collect = avgRDD.collect();
            collect.forEach(tup -> System.out.println(tup._1   " >>> "   tup._2));

//            Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));
//            Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();
//            Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();
//            Map<String, Double> map = new HashMap<>();
//            while (it.hasNext()) {
//                Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();
//                Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);
//                long count = entry.getValue().stream().map(tup -> tup._2).count();
//
//                map.put(entry.getKey(), Double.valueOf(sum) / count);
//            }
//            map.forEach((name, amount) -> System.out.println(name   " >>> "   amount));

        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * mapValues 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo09 {
  def main(args: Array[String]): Unit = {
    val appName = "mapValues"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val groupData = tupleData.groupByKey()
    groupData.foreach(println)
    val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size).formatted("%.2f"))
    avgData.foreach(println)
  }
}

学新通
sortByKey
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.List;

/**
 * sortByKey reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo10 {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "sortByKey reduceByKey";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从外部中创建 RDD
            // External Datasets
            // 使用本地文件系统
            // JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
            JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
            JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
                String[] words = word.split(",");
                return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
            });
            // JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            //     @Override
            //     public Integer call(Integer integer, Integer integer2) throws Exception {
            //         return integer   integer2;
            //     }
            // });
            JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
            JavaPairRDD<String, Integer> soredRDD = reduceRDD.sortByKey(false);
            List<Tuple2<String, Integer>> collect = soredRDD.collect();
            collect.forEach(tup -> System.out.println(tup._1   " >>> "   tup._2));
        }
    }
}

学新通
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * sortByKey reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */
public class JavaDemo11 {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "sortByKey reduceByKey";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从外部中创建 RDD
            // External Datasets
            // 使用本地文件系统
            // JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
            JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
            // javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            //     @Override
            //     public Tuple2<String, Integer> call(String s) throws Exception {
            //         return null;
            //    }
            // });
            JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
                String[] words = word.split(",");
                return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
            });
            JavaPairRDD<String, Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();
            JavaPairRDD<String, Double> avgRDD = groupRDD.mapValues(v -> {
                int sum = 0;
                Iterator<Integer> it = v.iterator();
                AtomicInteger atomicInteger = new AtomicInteger();
                while (it.hasNext()) {
                    Integer amount = it.next();
                    sum  = amount;
                    atomicInteger.incrementAndGet();
                }
                return (double) sum / atomicInteger.get();
            });
            JavaPairRDD<String, Double> sortedRDD = avgRDD.sortByKey(false);
            List<Tuple2<String, Double>> collect = sortedRDD.collect();
            collect.forEach(tup -> System.out.println(tup._1   " >>> "   tup._2));

//            Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));
//            Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();
//            Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();
//            Map<String, Double> map = new HashMap<>();
//            while (it.hasNext()) {
//                Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();
//                Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);
//                long count = entry.getValue().stream().map(tup -> tup._2).count();
//
//                map.put(entry.getKey(), Double.valueOf(sum) / count);
//            }
//            map.forEach((name, amount) -> System.out.println(name   " >>> "   amount));

        }
    }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * sortByKey  educeByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo10 {
  def main(args: Array[String]): Unit = {
    val appName = "sortByKey reduceByKey"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val sumData = tupleData.reduceByKey(_   _)
    sumData.foreach(println)
    val swapData = sumData.map(_.swap)
    // 参数 true为升序 false为降序 默认为升序
    val sortData = swapData.sortByKey(ascending = false)
    sortData.foreach(println)
  }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * sortByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo11 {
  def main(args: Array[String]): Unit = {
    val appName = "sortByKey reduceByKey"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val groupData = tupleData.groupByKey()
    groupData.foreach(println)
    val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size))
    avgData.foreach(println)
    val swapData = avgData.map(_.swap)
    // 参数 true为升序 false为降序 默认为升序
    val sortData = swapData.sortByKey(ascending = false)
    sortData.foreach(v => println(v._2   ","   v._1.formatted("%.2f")))
  }
}

学新通
sortBy
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * sortBy  educeByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo12 {
  def main(args: Array[String]): Unit = {
    val appName = "sortBy reduceByKey"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val sumData = tupleData.reduceByKey(_   _)
    sumData.foreach(println)
    // 参数 true为升序 false为降序 默认为升序
    val sortedData = sumData.sortBy(_._2, ascending = false)
    sortedData.foreach(println)
  }
}

学新通
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * sortBy 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo13 {
  def main(args: Array[String]): Unit = {
    val appName = "sortBy reduceByKey"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val groupData = tupleData.groupByKey()
    groupData.foreach(println)
    val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size).formatted("%.2f").toDouble)
    // 参数 true为升序 false为降序 默认为升序
    val sortedData = avgData.sortBy(_._2, ascending = false)
    sortedData.foreach(println)
  }
}

学新通
join
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * join 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */
object ScalaDemo14 {
  def main(args: Array[String]): Unit = {
    val appName = "join"
    // spark基础配置
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)
    // 从外部中创建 RDD
    // External Datasets
    // 使用本地文件系统
    val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // distData.foreach(println)
    // (person3,137)
    val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
    // (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
    val sumData = tupleData.reduceByKey(_   _)
    val groupData = tupleData.groupByKey()
    val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size).formatted("%.2f").toDouble)
    val rsData = sumData.join(avgData)
    rsData.foreach(println)
  }
}

学新通
WordCount
JavaWordCount
package com.lihaozhe.course03;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 下午4:16
 */
public class JavaWordCount {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "WordCount";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        // conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");
            JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).listIterator());
            JavaPairRDD<String, Integer> javaPairRDD = wordsRdd.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
            JavaPairRDD<String, Integer> rs = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
            rs.saveAsTextFile("/data/result");
            sc.stop();
        }
    }
}

学新通
ScalaWordCount
package com.lihaozhe.course03

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 下午3:51
 */
object ScalaWordCount01 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    // val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val conf = new SparkConf().setAppName("WordCount")
    // conf.setMaster("local")
    val sc = new SparkContext(conf)
    val content = sc.textFile("/data/words.txt")
    // content.foreach(println)
    val words = content.flatMap(_.split(" "))
    // words.foreach(println)
    // (love,Seq(love, love, love, love, love))
    val wordGroup = words.groupBy(word => word)
    // wordGroup.foreach(println)
    // (love,5)
    val wordCount = wordGroup.mapValues(_.size)
    // wordCount.foreach(println)
    wordCount.saveAsTextFile("/data/result");
    sc.stop()
  }
}

学新通
package com.lihaozhe.course03

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 下午3:51
 */
object ScalaWordCount02 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    // val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val conf = new SparkConf().setAppName("WordCount")
    // conf.setMaster("local")
    val sc = new SparkContext(conf)
    val content = sc.textFile("/data/words.txt")
    // content.foreach(println)
    val words = content.flatMap(_.split(" "))
    // words.foreach(println)
    // (love,Seq(love, love, love, love, love))
    val wordMap = words.map((_, 1))
    // wordGroup.foreach(println)
    // (love,5)
    val wordCount = wordMap.reduceByKey(_   _)
    //wordCount.foreach(println)
    wordCount.saveAsTextFile("/data/result");
    sc.stop()
  }
}

学新通
项目打包发布
mvn package

上传jar文件到集群

在集群上提交

standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode client

standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode cluster

yarn client 模式 spark-submit --master yarn --delpoy-mode client

yarn cluster 模式 spark-submit --master yarn --delpoy-mode cluster

spark-submit --master yarn --class com.lihaozhe.course03.JavaWordCount spark-code.jar --deploy-mode cluster
spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount01 spark-code.jar --deploy-mode cluster
spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount02 spark-code.jar --deploy-mode cluster

sparkSQL

package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 构建 dataFrame
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo01 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    // root
    //   |-- _c0: string (nullable = true)
    //   |-- _c1: string (nullable = true)
    df.printSchema()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * show
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo02 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    df.show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * option 是否将第一列作为字段名 header默认值为 false
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo03 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read
      .option("header", "true") // 是否将第一列作为字段名 header默认值为 false
      .csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    // root
    //   |-- name: string (nullable = true)
    //   |-- amount: string (nullable = true)
    df.printSchema()
    df.show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * select
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo04 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // root
    //   |-- _c0: string (nullable = true)
    //   |-- _c1: string (nullable = true)
    df.printSchema()
    df.select("_c0", "_c1").show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * withColumnRenamed
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo05 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.withColumnRenamed("_c0", "name")
      .withColumnRenamed("_c1", "amount")
      .printSchema()
    // root
    //   |-- name: string (nullable = true)
    //   |-- amount: string (nullable = true)

    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}

/**
 * col("原字段名").cast(数据类型).as("新字段名"),
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo06 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),
    ).show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * topN
 * show(n,truncate = false)默认显示前20条记录 numRows 记录数 truncate 显示结果是否裁剪 默认值为true为裁剪 false为不裁剪
 * first() 第一条记录
 * take(n)
 * head(n)
 * tail(n) 最后n条记录
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo07 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read
      .option("header", "true") // 是否将第一列作为字段名 header默认值为 false
      .csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    // df.show(5,truncate = false)
    // println(df.first())
    // df.take(3).foreach(println)
    // df.head(3).foreach(println)
    df.tail(3).foreach(println)
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}

/**
 * where 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo08 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),
    ).where("amount > 100").show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}

/**
 * where 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo09 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),
    ).where(col("amount") > 100).show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}

/**
 * filter 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo10 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),
    ).filter("amount > 100").show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}

/**
 * filter 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo11 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),
    ).filter(col("amount") > 100).show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}

/**
 * groupBy count 分组聚合
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo12 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),
    ).groupBy(col("name")).count().show()
    spark.stop()
  }
}
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}

/**
 * groupBy count 分组聚合
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo12 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),
    ).groupBy(col("name")).count().show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course04

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * SQLContext
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo13 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.withColumnRenamed("_c0", "name")
      .withColumnRenamed("_c1", "amount")
    // 使用 DataFrame 生成一张临时表
    df.createOrReplaceTempView("order_info")
    // 获取 SQLContext 对象
    val sqlContext = spark.sqlContext
    sqlContext.sql("select _c0 as name,_c1 as amount from order_info where _c1 > 100").show(false)
    spark.stop()
  }
}

学新通
package com.lihaozhe.course05

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * DataFrame 与 DataSet
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo01 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read
      .option("header", "true") // 是否将第一列作为字段名 header默认值为 false
      .csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    val ds = df.as[OrderInfo]
    val rdd = ds.map(orderInfo => (orderInfo.name, orderInfo.amount.toInt)).rdd
    rdd.reduceByKey(_   _).foreach(println)
    spark.stop()
  }
}
case class OrderInfo(name: String, amount: String)

学新通
package com.lihaozhe.course05

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 创建 DataSet
 * spark.read.text 方法返回值为 DataFrame
 * spark.read.textFile 方法返回值为 DataSet
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo02 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    spark.stop()
  }
}

学新通
package com.lihaozhe.course05

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 *  直接使用 spark.read.text 或者 spark.read.textFile 读进来的数据 只有一个 String 类型 名字为 value 的值
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo03 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 读取 csv 文件获取 dataFrame
    val df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.show(5, truncate = false)
    ds.show(5, truncate = false)
    spark.stop()
  }
}

学新通
package com.lihaozhe.course05

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * Interoperating with RDDs
 * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo04 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._

    // Create an RDD of Person objects from a text file, convert it to a Dataframe
    val OrderDf = spark.sparkContext
      .textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
      .map(_.split(","))
      .map(attributes => OrderSchema(attributes(0), attributes(1).trim.toInt))
      .toDF()
    // Register the DataFrame as a temporary view
    OrderDf.createOrReplaceTempView("order_info")

    // SQL statements can be run by using the sql methods provided by Spark
    val teenagersDF = spark.sql("SELECT name, amount FROM order_info WHERE amount BETWEEN 100 AND 150")
    teenagersDF.map(teenager => "Name: "   teenager(0)).show(3, false)
    teenagersDF.map(teenager => "Name: "   teenager.getAs[String]("name")).show(3, truncate = false)
    // No pre-defined encoders for Dataset[Map[K,V]], define explicitly


    // 一次读取一行数据并将数据封装到map中
    // 基本类型和case类也可以定义为隐式val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    val array = teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "amount"))).collect()
    array.foreach(println)
    teenagersDF.toJSON.show(3, truncate = false)
    spark.stop()
  }
}

case class OrderSchema(name: String, amount: Int)

学新通
package com.lihaozhe.course05

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * Interoperating with RDDs
 * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
 * 1、Create an RDD of Rows from the original RDD;
 * 1、从原RDD的行中创建一个RDD;
 * 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
 * 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
 * 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
 * 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo05 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._

    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // 1、从原RDD的行中创建一个RDD;
    val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))

    // 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
    val struct = StructType(Array(
      StructField("name", StringType, nullable = true),
      StructField("amount", IntegerType, nullable = true)
    ))
    // 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。
    val df = spark.createDataFrame(rowRDD, struct)
    df.printSchema()
    // root
    //  |-- name: string  (nullable = true)
    //  |-- amount: integer (nullable = true)
    spark.stop()
  }
}

学新通
package com.lihaozhe.course05

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * Interoperating with RDDs
 * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
 * 1、Create an RDD of Rows from the original RDD;
 * 1、从原RDD的行中创建一个RDD;
 * 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
 * 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
 * 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
 * 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo06 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换

    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    // 1、从原RDD的行中创建一个RDD;
    val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))

    // 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
    val schemaString = "name amount"

    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    // 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。
    val df = spark.createDataFrame(rowRDD, schema)
    df.printSchema()
    // root
    //  |-- name: string  (nullable = true)
    //  |-- amount: string (nullable = true)
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
 * DataSource csv parquet
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo01 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val df = spark.read.option("header", "true").format("csv").load("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    df.select("name", "amount").write.mode(SaveMode.Overwrite).format("parquet").save("/data/spark/parquet")
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
 * DataSource parquet json
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo02 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val df = spark.read.option("header", "true").format("parquet").load("/data/spark/parquet")
    df.select("name", "amount").write.mode(SaveMode.Overwrite).format("json").save("/data/spark/json")
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo03 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://spark03")
      .option("dbtable", "lihaozhe.dujitang")
      .option("user", "root")
      .option("password", "Lihaozhe!!@@1122")
      .load()
    jdbcDF.show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

/**
 * DataSource JDBC MySQL
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo04 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "Lihaozhe!!@@1122")
    val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
    jdbcDF.show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

/**
 * DataSource JDBC MySQL 自定义数据类型
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo05 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "Lihaozhe!!@@1122")
    // 自定义数据类型
    connectionProperties.put("customSchema", "id STRING, content STRING")
    val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
    jdbcDF.printSchema()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

/**
 * DataSource JDBC MySQL
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo04 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "Lihaozhe!!@@1122")
    val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
    jdbcDF.show()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

/**
 * DataSource JDBC MySQL 自定义数据类型
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo05 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "Lihaozhe!!@@1122")
    // 自定义数据类型
    connectionProperties.put("customSchema", "id STRING, content STRING")
    val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
    jdbcDF.printSchema()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo06 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
    val struct = StructType(Array(
      StructField("name", StringType, nullable = true),
      StructField("amount", IntegerType, nullable = true)
    ))
    val df = spark.createDataFrame(rowRDD, struct)
    df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://spark03")
      .option("dbtable", "lihaozhe.data")
      .option("user", "root")
      .option("password", "Lihaozhe!!@@1122")
      .mode(SaveMode.Append)
      .save()
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import java.util.Properties

/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo07 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
    val struct = StructType(Array(
      StructField("name", StringType, nullable = true),
      StructField("amount", IntegerType, nullable = true)
    ))
    val df = spark.createDataFrame(rowRDD, struct)

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "Lihaozhe!!@@1122")
    df.write
      .mode(SaveMode.Append)
      .jdbc("jdbc:mysql://spark03", "lihaozhe.data", connectionProperties)
    spark.stop()
  }
}

学新通
package com.lihaozhe.course06

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import java.util.Properties

/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */
object ScalaDemo08 {
  def main(args: Array[String]): Unit = {
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config(conf)
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
    val struct = StructType(Array(
      StructField("name", StringType, nullable = true),
      StructField("amount", IntegerType, nullable = true)
    ))
    val df = spark.createDataFrame(rowRDD, struct)

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "Lihaozhe!!@@1122")
    df.write
      .mode(SaveMode.Append)
      .option("createTableColumnTypes", "name VARCHAR(33)")
      .jdbc("jdbc:mysql://spark03", "lihaozhe.data", connectionProperties)
    spark.stop()
  }
}

学新通
hdfs dfs -mkdir -p /lihaozhe
hdfs dfs -mkdir -p /hive/info
create database lihaozhe location '/lihaozhe';
use lihaozhe;
create external table `info`(
    name string comment '姓名',
    amount int comment '金额'
) comment '订单表'
 row format delimited fields terminated by ','
 lines terminated by '\n'
 stored as textfile
 location '/hive/info';
load data local inpath  '/root/data.csv' into table info; 
package com.lihaozhe.course07

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


/**
 * spark on hive
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 下午4:07
 */
object ScalaHiveSource {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    // 基础配置
    val conf = new SparkConf()
    if (!conf.contains("spark.master")) {
      conf.setMaster("local")
    }

    val spark = SparkSession
      .builder()
      .appName("Spark SQL hive example")
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    import spark.sql

    sql("select * from lihaozhe.info").show()
    spark.stop()

  }
}

学新通

报错信息 Couldn’t create directory /user/hive/resources
学新通
报错原因
报错是因为 hive-site.xml 配置文件中
hive.downloaded.resources.dir 配置项目录
/user/hive/resources 无法在本地创建
学新通

  • SparkSQL通过hive-site.xml中的配置信息,连接元数据数据库,
    通过hdfs-site.xml和core-site.xml文件连接HDFS
  • 配置文件中 /user/hive/resources 目录 是创建在运行IDEA的机器上,
    即window、mac、linux本机, 而不是在远程服务器上创建
    由于无法在本地文件系统中创建/user/hive/resources 目录,所以报错

解决 org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: Couldn’t create directory /user/hive/resources
在本地文件创建目录

mac 或 linux

sudo mkdir -p /user/hive/resources

windows
在项目所在盘符创建目录
例如项目在 D盘 就在D盘创建目录 d:\user\hive\resources
md d:\user\hive\resources
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcgjab
系列文章
更多 icon
同类精品
更多 icon
继续加载