大数据技术之Spark SQL——解析JSON字符串

这篇具有很好参考价值的文章主要介绍了大数据技术之Spark SQL——解析JSON字符串。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、数据准备

1)Department

 2)School

3)Student

4)Teacher

5)实例化对象

结构如下:

二、加载数据

数据展示 

三、日志数据加载

输出结果 


一、数据准备

spark解析json字符串,# spark,sql,json,java

1)Department

package org.example.jsonre;

public class Department {
    private String name;
    private String describe;

    @Override
    public String toString() {
        return "Department{" +
                "name='" + name + '\'' +
                ", describe='" + describe + '\'' +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getDescribe() {
        return describe;
    }

    public void setDescribe(String describe) {
        this.describe = describe;
    }

    public Department() {
    }

    public Department(String name, String describe) {
        this.name = name;
        this.describe = describe;
    }
}

 2)School

package org.example.jsonre;

public class School {
    private String name;
    private String leader;
    private String address;

    public School() {
    }

    public School(String name, String leader, String address) {
        this.name = name;
        this.leader = leader;
        this.address = address;
    }

    @Override
    public String toString() {
        return "School{" +
                "name='" + name + '\'' +
                ", leader='" + leader + '\'' +
                ", address='" + address + '\'' +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getLeader() {
        return leader;
    }

    public void setLeader(String leader) {
        this.leader = leader;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}

3)Student

package org.example.jsonre;

import com.alibaba.fastjson.JSON;

import java.io.*;
import java.util.ArrayList;

public class Student {
    private Integer id;
    private String name;
    private String grade;
    private School school;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGrade() {
        return grade;
    }

    public void setGrade(String grade) {
        this.grade = grade;
    }

    public School getSchool() {
        return school;
    }

    public void setSchool(School school) {
        this.school = school;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", grade='" + grade + '\'' +
                ", school=" + school +
                '}';
    }

    public Student() {
    }

    public Student(Integer id, String name, String grade, School school) {
        this.id = id;
        this.name = name;
        this.grade = grade;
        this.school = school;
    }
}

4)Teacher

package org.example.jsonre;


import java.util.ArrayList;

public class Teacher {
    private Integer id;
    private String name;
    private String tel;
    private String email;
    private Department dept;

    @Override
    public String toString() {
        return "Teacher{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", tel='" + tel + '\'' +
                ", email='" + email + '\'' +
                ", dept=" + dept +
                ", stu=" + stu +
                '}';
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getTel() {
        return tel;
    }

    public void setTel(String tel) {
        this.tel = tel;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

    public Department getDept() {
        return dept;
    }

    public void setDept(Department dept) {
        this.dept = dept;
    }

    public ArrayList<Student> getStu() {
        return stu;
    }

    public void setStu(ArrayList<Student> stu) {
        this.stu = stu;
    }

    public Teacher() {
    }

    public Teacher(Integer id, String name, String tel, String email, Department dept, ArrayList<Student> stu) {
        this.id = id;
        this.name = name;
        this.tel = tel;
        this.email = email;
        this.dept = dept;
        this.stu = stu;
    }

    private ArrayList<Student> stu;

}

5)实例化对象

public class Test {

    public static void main(String[] args) {
// 数据装载
                Teacher teacher = new Teacher();

        School school = new School();
        school.setAddress("安德门");
        school.setLeader("王德发");
        school.setName("南京某大学");
        Department department = new Department();
        department.setName("学术部");
        department.setDescribe("主要负责教学研究");
        Student stu1 = new Student();
        stu1.setId(1);
        stu1.setName("张三");
        stu1.setGrade("一年级");

        stu1.setSchool(school);
        Student stu2 = new Student();
        stu2.setId(2);
        stu2.setName("李四");
        stu2.setGrade("一年级");
        School school3 = new School();
        School school2 = new School();
        school2.setAddress("安德门");
        school2.setLeader("王德发");
        school2.setName("南京某大学");
        stu2.setSchool(school2);
        Student stu3 = new Student();
        stu3.setId(3);
        stu3.setName("赵六");
        stu3.setGrade("二年级");
        school3.setAddress("安德门");
        school3.setLeader("王德发");
        school3.setName("南京某大学");
        stu3.setSchool(school3);
        ArrayList<Student> students = new ArrayList<>();
        students.add(stu1);
        students.add(stu2);
        students.add(stu3);
        teacher.setDept(department);
        teacher.setStu(students);
        teacher.setEmail("fivedessert@gmail.cn");
        teacher.setName("five小点心");
        teacher.setTel("1231231234");

        String jsonString = JSON.toJSONString(teacher);
        System.out.println(jsonString);


        FileWriter fr = null;
        try {
            fr = new FileWriter("out/jsonTest.txt");
            fr.write(jsonString);

            fr.flush();

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            fr.close();
        } catch (IOException e) {
            e.printStackTrace();
        }



    }
}
{"dept":{"describe":"主要负责教学研究","name":"学术部"},"email":"fivedessert@gmail.cn","name":"five小点心","stu":[{"grade":"一年级","id":1,"name":"张三","school":{"address":"安德门","leader":"王德发","name":"南京某大学"}},{"grade":"一年级","id":2,"name":"李四","school":{"address":"安德门","leader":"王德发","name":"南京某大学"}},{"grade":"二年级","id":3,"name":"赵六","school":{"address":"安德门","leader":"王德发","name":"南京某大学"}}],"tel":"1231231234"}

结构如下:

spark解析json字符串,# spark,sql,json,java

二、加载数据

sc.textFile读取数据源,并对结构化数据进行拆分

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object JsonTest {
  def main(args: Array[String]): Unit = {
// 定义sc配置
    val conf = new SparkConf().setMaster("local").setAppName("JsonTest")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val sc = spark.sparkContext

// 导入包
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import spark.implicits._

// jsonobj1:读取数据,输出rdd数据
    val jsonobj1 = sc.textFile("out/jsonTest.txt")
    jsonobj1.collect().foreach(println)

// jsonobj2:将rdd数据转换成DF类型数据
    val jsonobj2 = jsonobj1.toDF("teacher")
    jsonobj2.printSchema()
    jsonobj2.show(false)

// jsonobj3:将json类型数据按照需求进行切分
    val jsonobj3 = jsonobj2.select(
      get_json_object($"teacher","$.dept").as("dept"),
      get_json_object($"teacher","$.email").as("email"),
      get_json_object($"teacher","$.name").as("name"),
      get_json_object($"teacher","$.stu").as("stu"),
      get_json_object($"teacher","$.tel").as("tel")
    )
    jsonobj3.printSchema()
    jsonobj3.show(false)

// jsonobj4:与obj3类似,将dept数据进行进一步拆分
    val jsonobj4 = jsonobj3.select(
      get_json_object($"dept", "$.name").as("dept_name"),
      get_json_object($"dept", "$.describe").as("describe"),
      $"email", $"stu", $"tel"
    )
    jsonobj4.printSchema()
    jsonobj4.show(false)

// jsonobj5:将stu的数据进行结构化处理,需要定义数据类型
    val jsonobj5 = jsonobj4.select($"dept_name",$"describe",$"email",$"tel"
      ,from_json($"stu",ArrayType(StructType(
          StructField("grade", StringType) ::
            StructField("id", LongType) ::
            StructField("name", StringType) ::
            StructField("school", StringType) ::Nil
      ))).as("stu"))
    jsonobj5.printSchema()
    jsonobj5.show(false)

// jsonobj6:将stu数据分成多行
    val jsonobj6 = jsonobj5.withColumn("stu",explode($"stu"))
    jsonobj6.printSchema()
    jsonobj6.show(false)

// jsonobj7:obj6的数据继续拆分
// 由于是结构化数据,这里调用的方式与json略有不同
    val jsonobj7 = jsonobj6.select($"dept_name",$"describe",$"email",$"tel",
      $"stu.grade".as("stu_grade"),
      $"stu.id".as("stu_id"),
      $"stu.name".as("stu_name"),
      $"stu.school".as("stu_school")
    )
    jsonobj7.printSchema()
    jsonobj7.show(false)

// jsonobj8:将剩余数据进行拆分
    val jsonobj8 = jsonobj7.select($"dept_name",$"describe",$"email",$"tel",$"stu_grade",$"stu_id",$"stu_name",
      get_json_object($"stu_school","$.address").as("sch_address"),
      get_json_object($"stu_school","$.leader").as("sch_leader"),
      get_json_object($"stu_school","$.name").as("sch_name")
    )
    jsonobj8.printSchema()
    jsonobj8.show()

// 关闭
    spark.close()
    sc.stop()
  }

}

数据展示 

spark解析json字符串,# spark,sql,json,java spark解析json字符串,# spark,sql,json,java

spark解析json字符串,# spark,sql,json,java spark解析json字符串,# spark,sql,json,java

spark解析json字符串,# spark,sql,json,java spark解析json字符串,# spark,sql,json,java

 spark解析json字符串,# spark,sql,json,java

 spark解析json字符串,# spark,sql,json,java

三、日志数据加载

同样的,除了json格式字符串,我们也可以用类似的方法来加载日志数据。

log日志数据文件(测试用-test)-其它文档类资源-CSDN文库

package org.example.etl.util.test

import com.mysql.cj.util.SaslPrep
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions, types}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object etldemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("etl").setMaster("local")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val sc = spark.sparkContext

    import spark.implicits._

    val rdd1 = sc.textFile("in/test.log")
//    rdd1.collect().foreach(println)

    // 加载日志数据,按照\t分隔,过滤出长度为8的数据
    // 将数据封装到Row对象,创建DF
    val rdd2 = rdd1.map(x => x.split("\t"))
      .filter(x => x.length == 8)
      .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
//    rdd2.collect().foreach(println)

    val schema = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("url", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
      )
    )

    val logdf1 = spark.createDataFrame(rdd2,schema)
    logdf1.printSchema()
    logdf1.show(5)

    //    过滤数据:去除重复项 + status=200 + event_time不为空
        val logdf2 = logdf1.dropDuplicates("event_time", "url")
          .filter($"status" === 200)
          .filter($"event_time".isNotNull)
    logdf2.printSchema()
    logdf2.show(5,false)

    val logrdd = logdf2.map(line => {
      val str = line.getAs[String]("url")
      val strArray = str.split("\\?")
      var strMap: Map[String, String] = null;
      if (strArray.length == 2) {
        val tuples: Array[(String, String)] = strArray(1).split("&")
          .map(x => x.split("="))
          .filter(x => x.length == 2)
          .map(x => (x(0), x(1)))
        strMap = tuples.toMap
      }
      // 返回值,这里getAs后必须跟类型值
      (
        line.getAs[String]("event_time"),
        strMap.getOrElse("userUID", ""),
        strMap.getOrElse("userSID", ""),
        strMap.getOrElse("actionBegin", ""),
        strMap.getOrElse("actionEnd", ""),
        strMap.getOrElse("actionType", ""),
        strMap.getOrElse("actionName", ""),
        strMap.getOrElse("actionValue", ""),
        strMap.getOrElse("actionTest", ""),
        strMap.getOrElse("ifEquipment", ""),
        line.getAs[String]("method"),
        line.getAs[String]("status"),
        line.getAs[String]("sip"),
        line.getAs[String]("user_uip"),
        line.getAs[String]("action_prepend"),
        line.getAs[String]("action_client"),
      )
    }
    ).toDF()    // 这时候输出的数据类型为元组,需要对其列进行指定
        .rdd

    val schema2 = new StructType(Array(
      StructField("event_time",StringType),
      StructField("userUID",StringType),
      StructField("userSID",StringType),
      StructField("actionBegin",StringType),
      StructField("actionEnd",StringType),
      StructField("actionType",StringType),
      StructField("actionName",StringType),
      StructField("actionValue",StringType),
      StructField("actionTest",StringType),
      StructField("ifEquipment",StringType),
      StructField("method",StringType),
      StructField("status",StringType),
      StructField("sip",StringType),
      StructField("user_uip",StringType),
      StructField("action_prepend",StringType),
      StructField("action_client",StringType)
    ))

    val logdf3 = spark.createDataFrame(logrdd, schema2)
    logdf3.printSchema()
    logdf3.show(3,false)

    spark.close()
    sc.stop()
  }
}

输出结果 

spark解析json字符串,# spark,sql,json,java文章来源地址https://www.toymoban.com/news/detail-756975.html

到了这里,关于大数据技术之Spark SQL——解析JSON字符串的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Oracle解析JSON字符串

    Oracle解析JSON字符串

    假设某个字段存储的JSON字符串,我们不想查出来后通过一些常见的编程语言处理( JSON.parse() 或者是 JSONObject.parseObject() 等),想直接在数据库上处理,又该如何书写呢? 其实在 ORACLE 中也支持多种机制去处理JSON数据,例如有操作函数JSON_ARRAY、JSON_EXISTS、JSON_VALUES、JSON_TABLE、

    2024年02月16日
    浏览(12)
  • 7. Hive解析JSON字符串、JSON数组

    Hive解析JSON字符串 1. get_json_object 语法: get_json_object(json_string, path) json_string 是要解析的JSON字符串 path 是用于指定要提取的字段路径的字符串 局限性 get_json_object 函数的性能会受到 JSON数据的结构和大小 的影响。对于较复杂的嵌套结构,考虑使用Hive的其他函数或自定义函数来

    2024年02月11日
    浏览(10)
  • java解析多层嵌套json字符串

    在java 程序中,经常会涉及到各种变量值提取的问题,下面给出简单的示例及说明: JSON实际上也是键值对(\\\"key\\\":\\\"value\\\"),key 必须是字符串,value 可以是合法的 JSON 数据类型(字符串, 数字, 对象, 数组, 布尔值或 null) value如果是字符串,用jsonobj.getString(\\\"key\\\")获取 value如果是数

    2024年02月15日
    浏览(11)
  • 一个退役中校教你如何用go语言写一个基于B+树的json数据库(进阶篇)之json字符串解析为BsTr结构(一)

    1.对象式json字符串 s := \\\"{\\\"put\\\":{\\\"putjsontest\\\":{\\\"aaa\\\":\\\"sdftsdfs\\\\dfe29asdf\\\",\\\"aaab\\\":true,\\\"arrarrstrct\\\":{\\\"nnn\\\":-1234567890,\\\"ccc\\\":[[\\\"sdftsdfs\\\\dfe29asdf\\\",\\\"nmbndfvdfgfdg\\\"],[\\\"sdftsdfs\\\\dfe29asdf\\\",\\\"poiuiyyttt\\\"]]},\\\"ddd\\\":\\\"sdftsdfs\\\\dfe29asdf\\\",\\\"fff\\\":false,\\\"comboolarr\\\":[{\\\"boolarr0\\\":[true,false]},{\\\"boolarr1\\\":[true,false]}]}

    2024年02月21日
    浏览(12)
  • C#2010 .NET4 解析 json 字符串

    C#2010 .NET4 解析 json 字符串

    下载Newtonsoft.Json.dll  

    2024年02月11日
    浏览(11)
  • 五分钟学会如何用java解析json字符串!

    在工作中我们常常使用json来存储和传输结构化的数据,如用户信息、配置信息等。它通常以string的方式进行传输,因此如何将string解析并得到我们想要的信息是一项必备且常用的功能。 JSON(JavaScript Object Notation)是一种轻量级的、基于文本的、与语言无关的数据交换格式,

    2024年02月11日
    浏览(12)
  • Golang校验字符串是否JSON格式方法json.Valid源码解析

    上篇文章《Golang中如何校验字符串是否为JSON格式?》主要讲解了使用json.Valid校验字符串是否JSON格式的使用方法,本文来剖析一下json.Valid方法的源码。 json.Valid方法定义: scan := newScanner() 获取一个 scanner 类型的对象,关键的是checkValid方法,checkValid源码如下: 首先调用了sc

    2023年04月26日
    浏览(9)
  • JSON对象字符串在C#中进行像sql一样动态查询

    在C#中,我们可以使用多种方法来根据条件动态查询JSON对象字符串数据,类似于SQL语句查询。 使用JObject JObject是Json.NET中的一个类,可以方便地操作JSON对象。通过JObject,我们可以像使用SQL一样使用LINQ查询语句来查询JSON对象。 示例代码: 使用JsonPath JsonPath是一种基于JSON对象

    2023年04月14日
    浏览(12)
  • 【微信小程序】使用 JSON.parse 方法将返回的 JSON 字符串解析为对象

    在微信小程序中,你可以使用 JSON.parse 方法将返回的 JSON 字符串解析为对象。以下是实现类似功能的示例代码: 在上述代码中,我们使用 JSON.parse 方法将返回的 JSON 字符串解析为对象。然后,我们可以通过点语法或方括号语法访问解析后的对象的属性,例如 data.result 。 请注

    2024年02月15日
    浏览(8)
  • gin 解析 formData带文件和json字符串的一起的方式

    基础不好真的难受,这点问题折腾半天… 网上看到很多单独解析文件的,网上的资料确实都能找到。但是结合gin解析文件和json字符串(或其他类型的参数,但字符串最好)一起的,就不好找。 需求是前端使用 Content-Type: multipart/form-data 的形式上传文件以及需要的参数。后端

    2024年02月16日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包