code学习

Spark Structured Streaming - 1

文章目录

  • ​​1.Spark Streaming和Structured Streaming​​
  • ​​1.1 Spark Streaming时代​​
  • ​​1.2 Structured Streaming时代​​
  • ​​1.3 Spark Streaming和Structured Streaming​​
  • ​​2.word Count​​
  • ​​3.Structured Streaming的体系和结构​​
  • ​​3.1 无限扩展的表格​​
  • ​​3.2 体系结构​​

1.Spark Streaming和Structured Streaming

1.1 Spark Streaming时代

是RDD的API的流式工具, 其本质还是RDD, 存储和执行过程依然类似RDD

Spark Structured Streaming - 1

1.2 Structured Streaming时代

是Dataset的API的流式工具,API和Dataset保持高度一致

Spark Structured Streaming - 1
Spark Structured Streaming - 1

1.3 Spark Streaming和Structured Streaming

  1. 进步就类似于Dataset相比于RDD的进步
  2. Structured Streaming已经支持了连续流模型, 也就是类似于Flink那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式
  3. 在2.2.0以后Structured Streaming被标注为稳定版本, 意味着以后的Spark流式开发不应该在采用Spark Streaming了

Spark Streaming:

  1. 基于微批,延迟高不能做到真正的实时
  2. DStream基于RDD,不直接支持SQL
  3. 流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)
  4. 不支持EventTime事件时间

    注:

    EventTime事件时间 :事件真正发生的事件

    PorcessingTime处理时间:事件被流系统处理的时间

    IngestionTime摄入时间:事件到底流系统的时间

    如: 一条错误日志10月1日,23:59:00秒产生的(事件时间),因为网路延迟,到10月2日 00:00:10到达日志处理系统(摄入时间),10月2日 00:00:20被流系统处理(处理时间)

    如果要统计10月1日的系统bug数量,那么SparkStreaming不能正确统计,因为它不支持事件时间

  5. 数据的Exactly-Once(恰好一次语义)需要手动实现

    注: 数据的一致性语义

    最多一次

    恰好一次–是我们的目标,SparkStreaming如果要实现恰好一次,需要手动维护偏移量+其他操作

    最少一次

Structured Streaming:

  1. 编程模型: 动态表格/无界表
  2. 数据抽象: DataFrame/DataSet
  3. 与Spark Sql 无缝连接

2.word Count

object SocketWordCount {
  def main(args: Array[String]): Unit = {
    // 1. 创建SparkSession
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("socket word count")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._

    // 2. 读取外部数据源,并转为Dataset[String]
    val source: Dataset[String] = spark.readStream
      .format("socket")
      .option("host", "192.168.88.100")
      .option("port", 7777)
      .load()
      .as[String]

    // 3. 统计词频
    val words = source.flatMap(_.split(" "))
      .map((_, 1))
      .groupByKey(_._1)
      .count()

    // 4. 输出结果
    words.writeStream
      .outputMode(OutputMode.Complete())   // 统计全局结果,而不是一个批次
      .format("console")
      .start()
      .awaitTermination()
  }
}      
Spark Structured Streaming - 1
Spark Structured Streaming - 1
  • Structured Streaming 依然是小批量的流处理
  • Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的
  • 从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样

3.Structured Streaming的体系和结构

3.1 无限扩展的表格

Spark中的DS有两种:

  1. 处理静态批量数据的DS:使用read和write进行读写
  2. 处理动态实时流的DS:使用readStream和writeStream进行读写

如何使用DS表示流式计算

  1. 可以把流式的数据想象成一个不断增长,无限无界的表
  2. 无论是否有界,全部都使用DS这一套的API,所以可以保障流和批的处理使用完全相同的代码
Spark Structured Streaming - 1
Spark Structured Streaming - 1

3.2 体系结构

StreamExecution

StreamExecution在流上进行基于Dataset的查询, 也就是说,Dataset之所以能够在流上进行查询, 是因为StreamExecution的调度和管理。

Spark Structured Streaming - 1
  1. Structured Streaming虽然从API角度上模拟出来的是一个无限扩展的表,但不能无限存储,并且历史数据中有很多是冗余的,所以要做增量存储。
  2. 所以这里设置了一个全局范围的高可用StateStore,这个时候针对增量的查询变为如下步骤:

    1)从StateStore中取出上次执行完成后的状态

    2)把上次执行的结果加入本批次,再进行计算,得出全局结果

    3)将当前批次的结果放入StateStore中,留待下次使用

继续阅读