Spark 从入门到精通
Spark 从入门到精通
环境搭建
准备工作
创建安装目录
mkdir /opt/soft
cd /opt/soft
下载scala
wget https://downloads.lightbend.com/scala/2.13.10/scala-2.13.10.tgz -P /opt/soft
解压scala
tar -zxvf scala-2.13.10.tgz
修改scala目录名称
mv scala-2.13.10 scala-2
下载spark
wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz -P /opt/soft
解压spark
tar -zxvf spark-3.4.0-bin-hadoop3-scala2.13.tgz
修改目录名称
mv spark-3.4.0-bin-hadoop3-scala2.13 spark3
修改环境遍历
vim /etc/profile
export JAVA_HOME=/opt/soft/jdk8
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export ZOOKEEPER_HOME=/opt/soft/zookeeper
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_INSTALL=${HADOOP_HOME}
export HADOOP_MAPRED_HOME=${HADOOP_HOME}
export HADOOP_COMMON_HOME=${HADOOP_HOME}
export HADOOP_HDFS_HOME=${HADOOP_HOME}
export YARN_HOME=${HADOOP_HOME}
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HIVE_HOME=/opt/soft/hive3
export HCAT_HOME=/opt/soft/hive3/hcatalog
export SQOOP_HOME=/opt/soft/sqoop-1
export FLUME_HOME=/opt/soft/flume
export HBASE_HOME=/opt/soft/hbase2
export PHOENIX_HOME=/opt/soft/phoenix
export SCALA_HOME=/opt/soft/scala-2
export SPARK_HOME=/opt/soft/spark3
export SPARKPYTHON=/opt/soft/spark3/python
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$SQOOP_HOME/bin:$FLUME_HOME/bin:$HBASE_HOME/bin:$PHOENIX_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SPARKPYTHON
source /etc/profile
Local模式
scala java
启动
spark-shell
页面地址:http://spark01:4040
退出
:quit
pyspark
启动
pyspark
页面地址:http://spark01:4040
退出
quit() or Ctrl-D
本地模式提交应用
在spark目录下执行
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
- –class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
- –master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
- spark-examples_2.13-3.4.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
- 数字10表示程序的入口参数,用于设定当前应用的任务数量
Standalone模式
编写核心配置文件
cont目录下
cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native
export SPARK_MASTER_HOST=spark01
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03
配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc
在其它节点刷新环境遍历
source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui
http://spark01:6633
http://spark01:18080
提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
HA模式
编写核心配置文件
cont目录下
cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native
SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=spark01:2181,spark02:2181,spark03:2181
-Dspark.deploy.zookeeper.dir=/spark3"
export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633
hdfs dfs -mkdir /spark3
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03
配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc
在其它节点刷新环境遍历
source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui
http://spark01:6633
http://spark01:18080
提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
提交作业到Yarn
bin/spark-submit --master yarn \
--class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.13-3.4.0.jar 10
spark-code
spark-core
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lihaozhe</groupId>
<artifactId>spark-code</artifactId>
<version>1.0.0</version>
<properties>
<jdk.version>1.8</jdk.version>
<scala.version>2.13.10</scala.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<commons-lang3.version>3.12.0</commons-lang3.version>
<java-testdata-generator.version>1.1.2</java-testdata-generator.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.github.binarywang</groupId>
<artifactId>java-testdata-generator</artifactId>
<version>${java-testdata-generator.version}</version>
</dependency>
<!-- spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.4.0</version>
</dependency>
<!-- junit-jupiter-api -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<!-- junit-jupiter-engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
</dependency>
<!-- commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.13</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<!--<outputDirectory>../package</outputDirectory>-->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<!-- 设置编译字符编码 -->
<encoding>UTF-8</encoding>
<!-- 设置编译jdk版本 -->
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>3.2.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.3.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>3.3.2</version>
</plugin>
<!-- 编译级别 -->
<!-- 打包的时候跳过测试junit begin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.8.1</version>
<configuration>
<scalaCompatVersion>2.13</scalaCompatVersion>
<scalaVersion>2.13.10</scalaVersion>
</configuration>
<executions>
<execution>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
hdfs-conf
在 resources 目录下存放 hdfs 核心配置文件 core-site.xml 和hdfs-site.xml
被引入的hdfs配置文件为测试集群配置文件
由于生产环境与测试环境不同,项目打包的时候排除hdfs配置文件
rdd
数据集方式构建RDD
package com.lihaozhe.course01;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
* 借助并行数据集 Parallelized Collections 构建 RDD
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo01 {
public static void main(String[] args) {
String appName = "RDD";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 数据集
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
// 从集合中创建 RDD
// Parallelized Collections
JavaRDD<Integer> distData = sc.parallelize(data);
// 将数据获取到本地driver
List<Integer> result = distData.collect();
// lambda 表达式
result.forEach(System.out::println);
}
}
}
package com.lihaozhe.course01
import org.apache.spark.{SparkConf, SparkContext}
/**
* 借助并行数据集 Parallelized Collections 构建 RDD
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo01 {
def main(args: Array[String]): Unit = {
val appName = "rdd"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 数据集
val data = Array(1, 2, 3, 4, 5)
// 从集合中创建 RDD
// Parallelized Collections
val distData = sc.parallelize(data)
distData.foreach(println)
}
}
本地文件构建RDD
words.txt
linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
base phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto
package com.lihaozhe.course01;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
* 借助外部文件 External Datasets 构建 RDD
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo02 {
public static void main(String[] args) {
String appName = "RDD";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt");
// 将数据获取到本地driver
List<String> lines = javaRDD.collect();
// lambda 表达式
lines.forEach(System.out::println);
}
}
}
package com.lihaozhe.course01
import org.apache.spark.{SparkConf, SparkContext}
/**
* 借助外部文件 External Datasets 构建 RDD
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo02 {
def main(args: Array[String]): Unit = {
val appName = "rdd"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distFile = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt")
distFile.foreach(println)
}
}
HDFS文件构建RDD
package com.lihaozhe.course01;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
/**
* 借助外部文件 External Datasets 构建 RDD
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo03 {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
String appName = "RDD";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");
// 将数据获取到本地driver
List<String> lines = javaRDD.collect();
// lambda 表达式
lines.forEach(System.out::println);
}
}
}
package com.lihaozhe.course01
import org.apache.spark.{SparkConf, SparkContext}
/**
* 借助外部文件 External Datasets 构建 RDD
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo03 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val appName = "rdd"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用HDFS文件系统
// val distFile = sc.textFile("hdfs://spark01:8020/data/words.txt")
val distFile = sc.textFile("/data/words.txt")
distFile.foreach(println)
}
}
转换算子与行动算子
data.csv
person3,137
person7,193
person7,78
person0,170
person5,145
person5,54
person5,150
person0,102
person0,15
person8,172
person6,177
person5,158
person8,30
person6,184
person5,50
person4,127
person1,197
person3,99
person7,2
person7,51
person9,27
person6,34
person0,18
person7,111
person2,34
person0,80
person3,19
person8,121
person1,38
person3,37
person8,69
person3,116
person5,14
person4,121
person7,13
person8,10
person4,67
person6,177
person8,161
person6,113
person5,161
person3,159
person5,161
person2,88
person3,191
person0,155
person4,55
person6,153
person6,187
person0,41
person3,157
person4,179
person4,95
person1,12
person3,109
person9,24
person9,188
person1,114
person7,9
person7,82
person8,47
person9,153
person7,152
person6,110
person2,73
person8,132
person4,175
person7,153
person9,174
person3,23
person3,103
person9,169
person8,98
person6,62
person2,33
person3,127
person1,91
person6,198
person4,28
person1,182
person0,164
person5,198
person7,22
person0,46
person3,5
person8,140
person3,131
person4,195
person7,86
person0,137
person8,152
person8,154
person7,144
person5,142
person9,147
person1,29
person5,113
person6,173
person6,115
person9,148
person2,114
person7,69
person6,192
person0,113
person5,26
person3,7
person1,2
person6,60
person8,38
person6,19
person4,5
person3,50
person9,179
person2,148
person0,23
person3,121
person9,66
person9,90
person4,166
person7,199
person0,79
person2,157
person5,98
person6,25
person1,100
person4,184
person6,124
person4,183
person3,105
person6,28
person5,141
person6,60
person2,108
person5,171
person7,98
person2,57
person9,18
person8,35
person7,141
person0,180
person2,176
person9,130
person2,26
person0,81
person6,144
person3,33
person4,41
person9,60
person1,99
person4,115
person6,83
person2,90
person7,174
person8,47
person5,62
person0,119
person9,99
person3,125
person3,20
person1,137
person9,74
person6,1
person4,140
person4,122
person1,56
person7,107
person9,131
person7,174
person7,191
person8,31
person4,45
person9,84
person6,38
person9,186
person6,89
person5,87
person9,80
person5,107
person3,175
person8,44
person0,114
person7,63
person3,129
person9,77
person9,86
person9,183
person3,61
person4,104
person2,192
person5,142
person4,124
person5,76
person0,187
person3,38
person7,62
person5,153
person9,149
person7,87
person7,27
person6,88
count
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
* count 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo01 {
public static void main(String[] args) {
String appName = "count";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 数据集
List<Integer> data = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
// 从集合中创建 RDD
// Parallelized Collections
JavaRDD<Integer> distData = sc.parallelize(data);
long count = distData.count();
System.out.println("count = " count);
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* count 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo01 {
def main(args: Array[String]): Unit = {
val appName = "count"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 数据集
val data = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// 从集合中创建 RDD
// Parallelized Collections
val distData = sc.parallelize(data)
val count = distData.count()
println(s"count = ${count}")
}
}
take
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
* take 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo02 {
public static void main(String[] args) {
String appName = "take";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 数据集
List<Integer> data = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
// 从集合中创建 RDD
// Parallelized Collections
JavaRDD<Integer> distData = sc.parallelize(data);
List<Integer> topList = distData.take(3);
topList.forEach(System.out::println);
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* take 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo02 {
def main(args: Array[String]): Unit = {
val appName = "take"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 数据集
val data = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// 从集合中创建 RDD
// Parallelized Collections
val distData = sc.parallelize(data)
val top = distData.take(3)
top.foreach(println)
}
}
distinct
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
* distinct 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo03 {
public static void main(String[] args) {
String appName = "distinct";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 数据集
List<Integer> data = Arrays.asList(0, 1, 5, 6, 7, 8, 9, 3, 4, 2, 4, 3);
// 从集合中创建 RDD
// Parallelized Collections
JavaRDD<Integer> distData = sc.parallelize(data);
JavaRDD<Integer> uniqueRDD = distData.distinct();
List<Integer> uniqueList = uniqueRDD.collect();
uniqueList.forEach(System.out::println);
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* distinct 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo03 {
def main(args: Array[String]): Unit = {
val appName = "distinct"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 数据集
val data = Array(0, 1, 5, 6, 7, 8, 9, 3, 4, 2, 4, 3)
// 从集合中创建 RDD
// Parallelized Collections
val distData = sc.parallelize(data)
val uniqueData = distData.distinct()
uniqueData.foreach(println)
}
}
map
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
* map 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo04 {
public static void main(String[] args) {
String appName = "map";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 数据集
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
// 从集合中创建 RDD
// Parallelized Collections
JavaRDD<Integer> distData = sc.parallelize(data);
JavaRDD<Integer> rs = distData.map(num -> num * 2);
List<Integer> list = rs.collect();
list.forEach(System.out::println);
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* map 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo04 {
def main(args: Array[String]): Unit = {
val appName = "map"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 数据集
val data = Array(1, 2, 3, 4, 5)
// 从集合中创建 RDD
// Parallelized Collections
val distData = sc.parallelize(data)
val rs = distData.map(_ * 2)
rs.foreach(println)
}
}
flatMap
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* flatMap 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo05 {
public static void main(String[] args) {
String appName = "flatMap";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 数据集
List<String> data = Arrays.asList("hadoop hive presto","hbase phoenix","spark flink");
// 从集合中创建 RDD
// Parallelized Collections
JavaRDD<String> javaRDD = sc.parallelize(data);
// javaRDD.flatMap(new FlatMapFunction<String, Object>() {
// @Override
// public Iterator<Object> call(String s) throws Exception {
// return null;
// }
// });
JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).listIterator());
List<String> words = wordsRdd.collect();
words.forEach(System.out::println);
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* flatMap 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo05 {
def main(args: Array[String]): Unit = {
val appName = "flatMap"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 数据集
val data = Array("hadoop hive presto","hbase phoenix","spark flink")
// 从集合中创建 RDD
// Parallelized Collections
// ("hadoop","hive","presto","hbase","phoenix","spark","flink")
val rs = data.flatMap(_.split(" "))
rs.foreach(println)
}
}
filter
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
* filter 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo06 {
public static void main(String[] args) {
String appName = "filter";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 数据集
List<Integer> data = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
// 从集合中创建 RDD
// Parallelized Collections
JavaRDD<Integer> distData = sc.parallelize(data);
JavaRDD<Integer> evenRDD = distData.filter(num -> num % 2 == 0);
List<Integer> evenList = evenRDD.collect();
evenList.forEach(System.out::println);
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* filter 算子
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo06 {
def main(args: Array[String]): Unit = {
val appName = "filter"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 数据集
val data = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// 从集合中创建 RDD
// Parallelized Collections
val distData = sc.parallelize(data)
val evenData = distData.filter(_ % 2 == 0)
evenData.foreach(println)
}
}
groupByKey
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
/**
* groupByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo07 {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
String appName = "groupByKey";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
// @Override
// public Tuple2<String, Integer> call(String s) throws Exception {
// return null;
// }
// });
JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
String[] words = word.split(",");
return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();
List<Tuple2<String, Iterable<Integer>>> collect = groupRDD.collect();
collect.forEach(tup -> {
System.out.print(tup._1 " >>> (");
tup._2.forEach(num -> System.out.print(num ","));
System.out.println("\b)");
});
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* groupByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo07 {
def main(args: Array[String]): Unit = {
val appName = "groupByKey"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1)))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val groupData = tupleData.groupByKey()
groupData.foreach(println)
}
}
reduceByKey
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
/**
* reduceByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo08 {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
String appName = "reduceByKey";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
String[] words = word.split(",");
return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
});
// JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
// @Override
// public Integer call(Integer integer, Integer integer2) throws Exception {
// return integer integer2;
// }
// });
JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
List<Tuple2<String, Integer>> collect = reduceRDD.collect();
collect.forEach(tup -> System.out.println(tup._1 " >>> " tup._2));
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* reduceByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo08 {
def main(args: Array[String]): Unit = {
val appName = "reduceByKey"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val sumData = tupleData.reduceByKey(_ _)
sumData.foreach(println)
}
}
mapValues
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* mapValues 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo09 {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
String appName = "mapValues";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
// @Override
// public Tuple2<String, Integer> call(String s) throws Exception {
// return null;
// }
// });
JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
String[] words = word.split(",");
return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();
JavaPairRDD<String, Double> avgRDD = groupRDD.mapValues(v -> {
int sum = 0;
Iterator<Integer> it = v.iterator();
AtomicInteger atomicInteger = new AtomicInteger();
while (it.hasNext()) {
Integer amount = it.next();
sum = amount;
atomicInteger.incrementAndGet();
}
return (double) sum / atomicInteger.get();
});
List<Tuple2<String, Double>> collect = avgRDD.collect();
collect.forEach(tup -> System.out.println(tup._1 " >>> " tup._2));
// Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));
// Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();
// Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();
// Map<String, Double> map = new HashMap<>();
// while (it.hasNext()) {
// Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();
// Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);
// long count = entry.getValue().stream().map(tup -> tup._2).count();
//
// map.put(entry.getKey(), Double.valueOf(sum) / count);
// }
// map.forEach((name, amount) -> System.out.println(name " >>> " amount));
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* mapValues 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo09 {
def main(args: Array[String]): Unit = {
val appName = "mapValues"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val groupData = tupleData.groupByKey()
groupData.foreach(println)
val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size).formatted("%.2f"))
avgData.foreach(println)
}
}
sortByKey
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
/**
* sortByKey reduceByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo10 {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
String appName = "sortByKey reduceByKey";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
String[] words = word.split(",");
return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
});
// JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
// @Override
// public Integer call(Integer integer, Integer integer2) throws Exception {
// return integer integer2;
// }
// });
JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
JavaPairRDD<String, Integer> soredRDD = reduceRDD.sortByKey(false);
List<Tuple2<String, Integer>> collect = soredRDD.collect();
collect.forEach(tup -> System.out.println(tup._1 " >>> " tup._2));
}
}
}
package com.lihaozhe.course02;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* sortByKey reduceByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:18
*/
public class JavaDemo11 {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
String appName = "sortByKey reduceByKey";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");
JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");
// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
// @Override
// public Tuple2<String, Integer> call(String s) throws Exception {
// return null;
// }
// });
JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String, String, Integer>) word -> {
String[] words = word.split(",");
return new Tuple2<String, Integer>(words[0], Integer.parseInt(words[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();
JavaPairRDD<String, Double> avgRDD = groupRDD.mapValues(v -> {
int sum = 0;
Iterator<Integer> it = v.iterator();
AtomicInteger atomicInteger = new AtomicInteger();
while (it.hasNext()) {
Integer amount = it.next();
sum = amount;
atomicInteger.incrementAndGet();
}
return (double) sum / atomicInteger.get();
});
JavaPairRDD<String, Double> sortedRDD = avgRDD.sortByKey(false);
List<Tuple2<String, Double>> collect = sortedRDD.collect();
collect.forEach(tup -> System.out.println(tup._1 " >>> " tup._2));
// Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));
// Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();
// Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();
// Map<String, Double> map = new HashMap<>();
// while (it.hasNext()) {
// Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();
// Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);
// long count = entry.getValue().stream().map(tup -> tup._2).count();
//
// map.put(entry.getKey(), Double.valueOf(sum) / count);
// }
// map.forEach((name, amount) -> System.out.println(name " >>> " amount));
}
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* sortByKey educeByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo10 {
def main(args: Array[String]): Unit = {
val appName = "sortByKey reduceByKey"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val sumData = tupleData.reduceByKey(_ _)
sumData.foreach(println)
val swapData = sumData.map(_.swap)
// 参数 true为升序 false为降序 默认为升序
val sortData = swapData.sortByKey(ascending = false)
sortData.foreach(println)
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* sortByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo11 {
def main(args: Array[String]): Unit = {
val appName = "sortByKey reduceByKey"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val groupData = tupleData.groupByKey()
groupData.foreach(println)
val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size))
avgData.foreach(println)
val swapData = avgData.map(_.swap)
// 参数 true为升序 false为降序 默认为升序
val sortData = swapData.sortByKey(ascending = false)
sortData.foreach(v => println(v._2 "," v._1.formatted("%.2f")))
}
}
sortBy
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* sortBy educeByKey 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo12 {
def main(args: Array[String]): Unit = {
val appName = "sortBy reduceByKey"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val sumData = tupleData.reduceByKey(_ _)
sumData.foreach(println)
// 参数 true为升序 false为降序 默认为升序
val sortedData = sumData.sortBy(_._2, ascending = false)
sortedData.foreach(println)
}
}
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* sortBy 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo13 {
def main(args: Array[String]): Unit = {
val appName = "sortBy reduceByKey"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val groupData = tupleData.groupByKey()
groupData.foreach(println)
val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size).formatted("%.2f").toDouble)
// 参数 true为升序 false为降序 默认为升序
val sortedData = avgData.sortBy(_._2, ascending = false)
sortedData.foreach(println)
}
}
join
package com.lihaozhe.course02
import org.apache.spark.{SparkConf, SparkContext}
/**
* join 算子
* 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
*
* @author 李昊哲
* @version 1.0.0 2023/5/17 上午10:08
*/
object ScalaDemo14 {
def main(args: Array[String]): Unit = {
val appName = "join"
// spark基础配置
// val conf = new SparkConf().setAppName(appName).setMaster("local")
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 从外部中创建 RDD
// External Datasets
// 使用本地文件系统
val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// distData.foreach(println)
// (person3,137)
val tupleData = distData.map(line => (line.split(",")(0), line.split(",")(1).toInt))
// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))
val sumData = tupleData.reduceByKey(_ _)
val groupData = tupleData.groupByKey()
val avgData = groupData.mapValues(v => (v.sum.toDouble / v.size).formatted("%.2f").toDouble)
val rsData = sumData.join(avgData)
rsData.foreach(println)
}
}
WordCount
JavaWordCount
package com.lihaozhe.course03;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* @author 李昊哲
* @version 1.0.0 2023/5/17 下午4:16
*/
public class JavaWordCount {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
String appName = "WordCount";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
// conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");
JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).listIterator());
JavaPairRDD<String, Integer> javaPairRDD = wordsRdd.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> rs = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
rs.saveAsTextFile("/data/result");
sc.stop();
}
}
}
ScalaWordCount
package com.lihaozhe.course03
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author 李昊哲
* @version 1.0.0 2023/5/17 下午3:51
*/
object ScalaWordCount01 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val conf = new SparkConf().setAppName("WordCount")
// conf.setMaster("local")
val sc = new SparkContext(conf)
val content = sc.textFile("/data/words.txt")
// content.foreach(println)
val words = content.flatMap(_.split(" "))
// words.foreach(println)
// (love,Seq(love, love, love, love, love))
val wordGroup = words.groupBy(word => word)
// wordGroup.foreach(println)
// (love,5)
val wordCount = wordGroup.mapValues(_.size)
// wordCount.foreach(println)
wordCount.saveAsTextFile("/data/result");
sc.stop()
}
}
package com.lihaozhe.course03
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author 李昊哲
* @version 1.0.0 2023/5/17 下午3:51
*/
object ScalaWordCount02 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val conf = new SparkConf().setAppName("WordCount")
// conf.setMaster("local")
val sc = new SparkContext(conf)
val content = sc.textFile("/data/words.txt")
// content.foreach(println)
val words = content.flatMap(_.split(" "))
// words.foreach(println)
// (love,Seq(love, love, love, love, love))
val wordMap = words.map((_, 1))
// wordGroup.foreach(println)
// (love,5)
val wordCount = wordMap.reduceByKey(_ _)
//wordCount.foreach(println)
wordCount.saveAsTextFile("/data/result");
sc.stop()
}
}
项目打包发布
mvn package
上传jar文件到集群
在集群上提交
standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode client
standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode cluster
yarn client 模式 spark-submit --master yarn --delpoy-mode client
yarn cluster 模式 spark-submit --master yarn --delpoy-mode cluster
spark-submit --master yarn --class com.lihaozhe.course03.JavaWordCount spark-code.jar --deploy-mode cluster
spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount01 spark-code.jar --deploy-mode cluster
spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount02 spark-code.jar --deploy-mode cluster
sparkSQL
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 构建 dataFrame
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo01 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
// root
// |-- _c0: string (nullable = true)
// |-- _c1: string (nullable = true)
df.printSchema()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* show
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo02 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
df.show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* option 是否将第一列作为字段名 header默认值为 false
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo03 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read
.option("header", "true") // 是否将第一列作为字段名 header默认值为 false
.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
// root
// |-- name: string (nullable = true)
// |-- amount: string (nullable = true)
df.printSchema()
df.show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* select
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo04 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// root
// |-- _c0: string (nullable = true)
// |-- _c1: string (nullable = true)
df.printSchema()
df.select("_c0", "_c1").show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* withColumnRenamed
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo05 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.withColumnRenamed("_c0", "name")
.withColumnRenamed("_c1", "amount")
.printSchema()
// root
// |-- name: string (nullable = true)
// |-- amount: string (nullable = true)
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}
/**
* col("原字段名").cast(数据类型).as("新字段名"),
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo06 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.select(
col("_c0").cast(StringType).as("name"),
col("_c1").cast(IntegerType).as("amount"),
).show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* topN
* show(n,truncate = false)默认显示前20条记录 numRows 记录数 truncate 显示结果是否裁剪 默认值为true为裁剪 false为不裁剪
* first() 第一条记录
* take(n)
* head(n)
* tail(n) 最后n条记录
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo07 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read
.option("header", "true") // 是否将第一列作为字段名 header默认值为 false
.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
// df.show(5,truncate = false)
// println(df.first())
// df.take(3).foreach(println)
// df.head(3).foreach(println)
df.tail(3).foreach(println)
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}
/**
* where 按条件查询
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo08 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.select(
col("_c0").cast(StringType).as("name"),
col("_c1").cast(IntegerType).as("amount"),
).where("amount > 100").show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}
/**
* where 按条件查询
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo09 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.select(
col("_c0").cast(StringType).as("name"),
col("_c1").cast(IntegerType).as("amount"),
).where(col("amount") > 100).show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}
/**
* filter 按条件查询
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo10 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.select(
col("_c0").cast(StringType).as("name"),
col("_c1").cast(IntegerType).as("amount"),
).filter("amount > 100").show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}
/**
* filter 按条件查询
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo11 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.select(
col("_c0").cast(StringType).as("name"),
col("_c1").cast(IntegerType).as("amount"),
).filter(col("amount") > 100).show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}
/**
* groupBy count 分组聚合
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo12 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.select(
col("_c0").cast(StringType).as("name"),
col("_c1").cast(IntegerType).as("amount"),
).groupBy(col("name")).count().show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType}
/**
* groupBy count 分组聚合
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo12 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.select(
col("_c0").cast(StringType).as("name"),
col("_c1").cast(IntegerType).as("amount"),
).groupBy(col("name")).count().show()
spark.stop()
}
}
package com.lihaozhe.course04
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* SQLContext
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo13 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.withColumnRenamed("_c0", "name")
.withColumnRenamed("_c1", "amount")
// 使用 DataFrame 生成一张临时表
df.createOrReplaceTempView("order_info")
// 获取 SQLContext 对象
val sqlContext = spark.sqlContext
sqlContext.sql("select _c0 as name,_c1 as amount from order_info where _c1 > 100").show(false)
spark.stop()
}
}
package com.lihaozhe.course05
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* DataFrame 与 DataSet
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo01 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read
.option("header", "true") // 是否将第一列作为字段名 header默认值为 false
.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
val ds = df.as[OrderInfo]
val rdd = ds.map(orderInfo => (orderInfo.name, orderInfo.amount.toInt)).rdd
rdd.reduceByKey(_ _).foreach(println)
spark.stop()
}
}
case class OrderInfo(name: String, amount: String)
package com.lihaozhe.course05
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 创建 DataSet
* spark.read.text 方法返回值为 DataFrame
* spark.read.textFile 方法返回值为 DataSet
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo02 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")
val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
spark.stop()
}
}
package com.lihaozhe.course05
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 直接使用 spark.read.text 或者 spark.read.textFile 读进来的数据 只有一个 String 类型 名字为 value 的值
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo03 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// 读取 csv 文件获取 dataFrame
val df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")
val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
df.show(5, truncate = false)
ds.show(5, truncate = false)
spark.stop()
}
}
package com.lihaozhe.course05
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* Interoperating with RDDs
* 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo04 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val OrderDf = spark.sparkContext
.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
.map(_.split(","))
.map(attributes => OrderSchema(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
OrderDf.createOrReplaceTempView("order_info")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, amount FROM order_info WHERE amount BETWEEN 100 AND 150")
teenagersDF.map(teenager => "Name: " teenager(0)).show(3, false)
teenagersDF.map(teenager => "Name: " teenager.getAs[String]("name")).show(3, truncate = false)
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
// 一次读取一行数据并将数据封装到map中
// 基本类型和case类也可以定义为隐式val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
val array = teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "amount"))).collect()
array.foreach(println)
teenagersDF.toJSON.show(3, truncate = false)
spark.stop()
}
}
case class OrderSchema(name: String, amount: Int)
package com.lihaozhe.course05
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* Interoperating with RDDs
* 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
* 1、Create an RDD of Rows from the original RDD;
* 1、从原RDD的行中创建一个RDD;
* 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
* 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
* 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
* 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo05 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val sc = spark.sparkContext
val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// 1、从原RDD的行中创建一个RDD;
val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
// 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
val struct = StructType(Array(
StructField("name", StringType, nullable = true),
StructField("amount", IntegerType, nullable = true)
))
// 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。
val df = spark.createDataFrame(rowRDD, struct)
df.printSchema()
// root
// |-- name: string (nullable = true)
// |-- amount: integer (nullable = true)
spark.stop()
}
}
package com.lihaozhe.course05
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* Interoperating with RDDs
* 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
* 1、Create an RDD of Rows from the original RDD;
* 1、从原RDD的行中创建一个RDD;
* 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
* 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
* 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
* 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo06 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
val sc = spark.sparkContext
val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
// 1、从原RDD的行中创建一个RDD;
val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
// 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
val schemaString = "name amount"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。
val df = spark.createDataFrame(rowRDD, schema)
df.printSchema()
// root
// |-- name: string (nullable = true)
// |-- amount: string (nullable = true)
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* DataSource csv parquet
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo01 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val df = spark.read.option("header", "true").format("csv").load("file:///home/lsl/IdeaProjects/spark-code/info.csv")
df.select("name", "amount").write.mode(SaveMode.Overwrite).format("parquet").save("/data/spark/parquet")
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* DataSource parquet json
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo02 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val df = spark.read.option("header", "true").format("parquet").load("/data/spark/parquet")
df.select("name", "amount").write.mode(SaveMode.Overwrite).format("json").save("/data/spark/json")
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* DataSource JDBC MySQL Load Save
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo03 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://spark03")
.option("dbtable", "lihaozhe.dujitang")
.option("user", "root")
.option("password", "Lihaozhe!!@@1122")
.load()
jdbcDF.show()
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.util.Properties
/**
* DataSource JDBC MySQL
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo04 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "Lihaozhe!!@@1122")
val jdbcDF = spark.read
.jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
jdbcDF.show()
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.util.Properties
/**
* DataSource JDBC MySQL 自定义数据类型
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo05 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "Lihaozhe!!@@1122")
// 自定义数据类型
connectionProperties.put("customSchema", "id STRING, content STRING")
val jdbcDF = spark.read
.jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
jdbcDF.printSchema()
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.util.Properties
/**
* DataSource JDBC MySQL
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo04 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "Lihaozhe!!@@1122")
val jdbcDF = spark.read
.jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
jdbcDF.show()
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.util.Properties
/**
* DataSource JDBC MySQL 自定义数据类型
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo05 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "Lihaozhe!!@@1122")
// 自定义数据类型
connectionProperties.put("customSchema", "id STRING, content STRING")
val jdbcDF = spark.read
.jdbc("jdbc:mysql://spark03", "lihaozhe.dujitang", connectionProperties)
jdbcDF.printSchema()
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/**
* DataSource JDBC MySQL Load Save
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo06 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val sc = spark.sparkContext
val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
val struct = StructType(Array(
StructField("name", StringType, nullable = true),
StructField("amount", IntegerType, nullable = true)
))
val df = spark.createDataFrame(rowRDD, struct)
df.write
.format("jdbc")
.option("url", "jdbc:mysql://spark03")
.option("dbtable", "lihaozhe.data")
.option("user", "root")
.option("password", "Lihaozhe!!@@1122")
.mode(SaveMode.Append)
.save()
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import java.util.Properties
/**
* DataSource JDBC MySQL Load Save
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo07 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val sc = spark.sparkContext
val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
val struct = StructType(Array(
StructField("name", StringType, nullable = true),
StructField("amount", IntegerType, nullable = true)
))
val df = spark.createDataFrame(rowRDD, struct)
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "Lihaozhe!!@@1122")
df.write
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://spark03", "lihaozhe.data", connectionProperties)
spark.stop()
}
}
package com.lihaozhe.course06
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import java.util.Properties
/**
* DataSource JDBC MySQL Load Save
*
* @author 李昊哲
* @version 1.0.0 2023/5/18 上午8:30
*/
object ScalaDemo08 {
def main(args: Array[String]): Unit = {
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
// 隐式转换
import spark.implicits._
val sc = spark.sparkContext
val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))
val struct = StructType(Array(
StructField("name", StringType, nullable = true),
StructField("amount", IntegerType, nullable = true)
))
val df = spark.createDataFrame(rowRDD, struct)
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "Lihaozhe!!@@1122")
df.write
.mode(SaveMode.Append)
.option("createTableColumnTypes", "name VARCHAR(33)")
.jdbc("jdbc:mysql://spark03", "lihaozhe.data", connectionProperties)
spark.stop()
}
}
hdfs dfs -mkdir -p /lihaozhe
hdfs dfs -mkdir -p /hive/info
create database lihaozhe location '/lihaozhe';
use lihaozhe;
create external table `info`(
name string comment '姓名',
amount int comment '金额'
) comment '订单表'
row format delimited fields terminated by ','
lines terminated by '\n'
stored as textfile
location '/hive/info';
load data local inpath '/root/data.csv' into table info;
package com.lihaozhe.course07
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* spark on hive
* @author 李昊哲
* @version 1.0.0 2023/5/18 下午4:07
*/
object ScalaHiveSource {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// 基础配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local")
}
val spark = SparkSession
.builder()
.appName("Spark SQL hive example")
.config(conf)
.enableHiveSupport()
.getOrCreate()
// 隐式转换
import spark.implicits._
import spark.sql
sql("select * from lihaozhe.info").show()
spark.stop()
}
}
报错信息 Couldn’t create directory /user/hive/resources
报错原因
报错是因为 hive-site.xml 配置文件中
hive.downloaded.resources.dir 配置项目录
/user/hive/resources 无法在本地创建
- SparkSQL通过hive-site.xml中的配置信息,连接元数据数据库,
通过hdfs-site.xml和core-site.xml文件连接HDFS- 配置文件中 /user/hive/resources 目录 是创建在运行IDEA的机器上,
即window、mac、linux本机, 而不是在远程服务器上创建
由于无法在本地文件系统中创建/user/hive/resources 目录,所以报错
解决 org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: Couldn’t create directory /user/hive/resources
在本地文件创建目录
mac 或 linux
sudo mkdir -p /user/hive/resources
windows
在项目所在盘符创建目录
例如项目在 D盘 就在D盘创建目录 d:\user\hive\resources
md d:\user\hive\resources
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcgjab
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01