目录
一、数据准备
1)Department
2)School
3)Student
4)Teacher
5)实例化对象
结构如下:
二、加载数据
数据展示
三、日志数据加载
输出结果
一、数据准备
1)Department
package org.example.jsonre;
public class Department {
private String name;
private String describe;
@Override
public String toString() {
return "Department{" +
"name='" + name + '\'' +
", describe='" + describe + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescribe() {
return describe;
}
public void setDescribe(String describe) {
this.describe = describe;
}
public Department() {
}
public Department(String name, String describe) {
this.name = name;
this.describe = describe;
}
}
2)School
package org.example.jsonre;
public class School {
private String name;
private String leader;
private String address;
public School() {
}
public School(String name, String leader, String address) {
this.name = name;
this.leader = leader;
this.address = address;
}
@Override
public String toString() {
return "School{" +
"name='" + name + '\'' +
", leader='" + leader + '\'' +
", address='" + address + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLeader() {
return leader;
}
public void setLeader(String leader) {
this.leader = leader;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
3)Student
package org.example.jsonre;
import com.alibaba.fastjson.JSON;
import java.io.*;
import java.util.ArrayList;
public class Student {
private Integer id;
private String name;
private String grade;
private School school;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGrade() {
return grade;
}
public void setGrade(String grade) {
this.grade = grade;
}
public School getSchool() {
return school;
}
public void setSchool(School school) {
this.school = school;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", grade='" + grade + '\'' +
", school=" + school +
'}';
}
public Student() {
}
public Student(Integer id, String name, String grade, School school) {
this.id = id;
this.name = name;
this.grade = grade;
this.school = school;
}
}
4)Teacher
package org.example.jsonre;
import java.util.ArrayList;
public class Teacher {
private Integer id;
private String name;
private String tel;
private String email;
private Department dept;
@Override
public String toString() {
return "Teacher{" +
"id=" + id +
", name='" + name + '\'' +
", tel='" + tel + '\'' +
", email='" + email + '\'' +
", dept=" + dept +
", stu=" + stu +
'}';
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public Department getDept() {
return dept;
}
public void setDept(Department dept) {
this.dept = dept;
}
public ArrayList<Student> getStu() {
return stu;
}
public void setStu(ArrayList<Student> stu) {
this.stu = stu;
}
public Teacher() {
}
public Teacher(Integer id, String name, String tel, String email, Department dept, ArrayList<Student> stu) {
this.id = id;
this.name = name;
this.tel = tel;
this.email = email;
this.dept = dept;
this.stu = stu;
}
private ArrayList<Student> stu;
}
5)实例化对象
public class Test {
public static void main(String[] args) {
// 数据装载
Teacher teacher = new Teacher();
School school = new School();
school.setAddress("安德门");
school.setLeader("王德发");
school.setName("南京某大学");
Department department = new Department();
department.setName("学术部");
department.setDescribe("主要负责教学研究");
Student stu1 = new Student();
stu1.setId(1);
stu1.setName("张三");
stu1.setGrade("一年级");
stu1.setSchool(school);
Student stu2 = new Student();
stu2.setId(2);
stu2.setName("李四");
stu2.setGrade("一年级");
School school3 = new School();
School school2 = new School();
school2.setAddress("安德门");
school2.setLeader("王德发");
school2.setName("南京某大学");
stu2.setSchool(school2);
Student stu3 = new Student();
stu3.setId(3);
stu3.setName("赵六");
stu3.setGrade("二年级");
school3.setAddress("安德门");
school3.setLeader("王德发");
school3.setName("南京某大学");
stu3.setSchool(school3);
ArrayList<Student> students = new ArrayList<>();
students.add(stu1);
students.add(stu2);
students.add(stu3);
teacher.setDept(department);
teacher.setStu(students);
teacher.setEmail("fivedessert@gmail.cn");
teacher.setName("five小点心");
teacher.setTel("1231231234");
String jsonString = JSON.toJSONString(teacher);
System.out.println(jsonString);
FileWriter fr = null;
try {
fr = new FileWriter("out/jsonTest.txt");
fr.write(jsonString);
fr.flush();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
fr.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
{"dept":{"describe":"主要负责教学研究","name":"学术部"},"email":"fivedessert@gmail.cn","name":"five小点心","stu":[{"grade":"一年级","id":1,"name":"张三","school":{"address":"安德门","leader":"王德发","name":"南京某大学"}},{"grade":"一年级","id":2,"name":"李四","school":{"address":"安德门","leader":"王德发","name":"南京某大学"}},{"grade":"二年级","id":3,"name":"赵六","school":{"address":"安德门","leader":"王德发","name":"南京某大学"}}],"tel":"1231231234"}
结构如下:
二、加载数据
sc.textFile读取数据源,并对结构化数据进行拆分
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object JsonTest {
def main(args: Array[String]): Unit = {
// 定义sc配置
val conf = new SparkConf().setMaster("local").setAppName("JsonTest")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
// 导入包
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
// jsonobj1:读取数据,输出rdd数据
val jsonobj1 = sc.textFile("out/jsonTest.txt")
jsonobj1.collect().foreach(println)
// jsonobj2:将rdd数据转换成DF类型数据
val jsonobj2 = jsonobj1.toDF("teacher")
jsonobj2.printSchema()
jsonobj2.show(false)
// jsonobj3:将json类型数据按照需求进行切分
val jsonobj3 = jsonobj2.select(
get_json_object($"teacher","$.dept").as("dept"),
get_json_object($"teacher","$.email").as("email"),
get_json_object($"teacher","$.name").as("name"),
get_json_object($"teacher","$.stu").as("stu"),
get_json_object($"teacher","$.tel").as("tel")
)
jsonobj3.printSchema()
jsonobj3.show(false)
// jsonobj4:与obj3类似,将dept数据进行进一步拆分
val jsonobj4 = jsonobj3.select(
get_json_object($"dept", "$.name").as("dept_name"),
get_json_object($"dept", "$.describe").as("describe"),
$"email", $"stu", $"tel"
)
jsonobj4.printSchema()
jsonobj4.show(false)
// jsonobj5:将stu的数据进行结构化处理,需要定义数据类型
val jsonobj5 = jsonobj4.select($"dept_name",$"describe",$"email",$"tel"
,from_json($"stu",ArrayType(StructType(
StructField("grade", StringType) ::
StructField("id", LongType) ::
StructField("name", StringType) ::
StructField("school", StringType) ::Nil
))).as("stu"))
jsonobj5.printSchema()
jsonobj5.show(false)
// jsonobj6:将stu数据分成多行
val jsonobj6 = jsonobj5.withColumn("stu",explode($"stu"))
jsonobj6.printSchema()
jsonobj6.show(false)
// jsonobj7:obj6的数据继续拆分
// 由于是结构化数据,这里调用的方式与json略有不同
val jsonobj7 = jsonobj6.select($"dept_name",$"describe",$"email",$"tel",
$"stu.grade".as("stu_grade"),
$"stu.id".as("stu_id"),
$"stu.name".as("stu_name"),
$"stu.school".as("stu_school")
)
jsonobj7.printSchema()
jsonobj7.show(false)
// jsonobj8:将剩余数据进行拆分
val jsonobj8 = jsonobj7.select($"dept_name",$"describe",$"email",$"tel",$"stu_grade",$"stu_id",$"stu_name",
get_json_object($"stu_school","$.address").as("sch_address"),
get_json_object($"stu_school","$.leader").as("sch_leader"),
get_json_object($"stu_school","$.name").as("sch_name")
)
jsonobj8.printSchema()
jsonobj8.show()
// 关闭
spark.close()
sc.stop()
}
}
数据展示
三、日志数据加载
同样的,除了json格式字符串,我们也可以用类似的方法来加载日志数据。
log日志数据文件(测试用-test)-其它文档类资源-CSDN文库文章来源:https://www.toymoban.com/news/detail-756975.html
package org.example.etl.util.test
import com.mysql.cj.util.SaslPrep
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions, types}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object etldemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("etl").setMaster("local")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val rdd1 = sc.textFile("in/test.log")
// rdd1.collect().foreach(println)
// 加载日志数据,按照\t分隔,过滤出长度为8的数据
// 将数据封装到Row对象,创建DF
val rdd2 = rdd1.map(x => x.split("\t"))
.filter(x => x.length == 8)
.map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
// rdd2.collect().foreach(println)
val schema = StructType(
Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
val logdf1 = spark.createDataFrame(rdd2,schema)
logdf1.printSchema()
logdf1.show(5)
// 过滤数据:去除重复项 + status=200 + event_time不为空
val logdf2 = logdf1.dropDuplicates("event_time", "url")
.filter($"status" === 200)
.filter($"event_time".isNotNull)
logdf2.printSchema()
logdf2.show(5,false)
val logrdd = logdf2.map(line => {
val str = line.getAs[String]("url")
val strArray = str.split("\\?")
var strMap: Map[String, String] = null;
if (strArray.length == 2) {
val tuples: Array[(String, String)] = strArray(1).split("&")
.map(x => x.split("="))
.filter(x => x.length == 2)
.map(x => (x(0), x(1)))
strMap = tuples.toMap
}
// 返回值,这里getAs后必须跟类型值
(
line.getAs[String]("event_time"),
strMap.getOrElse("userUID", ""),
strMap.getOrElse("userSID", ""),
strMap.getOrElse("actionBegin", ""),
strMap.getOrElse("actionEnd", ""),
strMap.getOrElse("actionType", ""),
strMap.getOrElse("actionName", ""),
strMap.getOrElse("actionValue", ""),
strMap.getOrElse("actionTest", ""),
strMap.getOrElse("ifEquipment", ""),
line.getAs[String]("method"),
line.getAs[String]("status"),
line.getAs[String]("sip"),
line.getAs[String]("user_uip"),
line.getAs[String]("action_prepend"),
line.getAs[String]("action_client"),
)
}
).toDF() // 这时候输出的数据类型为元组,需要对其列进行指定
.rdd
val schema2 = new StructType(Array(
StructField("event_time",StringType),
StructField("userUID",StringType),
StructField("userSID",StringType),
StructField("actionBegin",StringType),
StructField("actionEnd",StringType),
StructField("actionType",StringType),
StructField("actionName",StringType),
StructField("actionValue",StringType),
StructField("actionTest",StringType),
StructField("ifEquipment",StringType),
StructField("method",StringType),
StructField("status",StringType),
StructField("sip",StringType),
StructField("user_uip",StringType),
StructField("action_prepend",StringType),
StructField("action_client",StringType)
))
val logdf3 = spark.createDataFrame(logrdd, schema2)
logdf3.printSchema()
logdf3.show(3,false)
spark.close()
sc.stop()
}
}
输出结果
文章来源地址https://www.toymoban.com/news/detail-756975.html
到了这里,关于大数据技术之Spark SQL——解析JSON字符串的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!