code学习

Spark Streaming实时数据分析

Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
[kfk@bigdata-pro01 softwares]$ sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm 
Preparing...                ########################################### [100%]
   1:nc                     ########################################### [100%]
[kfk@bigdata-pro01 softwares]$      
Spark Streaming实时数据分析

重新启用一个远程连接窗口

bin/run-example streaming.NetworkWordCount localhost 9999      
Spark Streaming实时数据分析

 回到这边输入一些信息

Spark Streaming实时数据分析

 看到这边就有数据接收到了

Spark Streaming实时数据分析

我们退出,换个方式启动

Spark Streaming实时数据分析

我们在这边再输入一些数据

Spark Streaming实时数据分析

这边处理得非常快

Spark Streaming实时数据分析

 因为打印的日志信息太多了,我修改一下配置文件(3个节点都修改吧,保守一点了)

Spark Streaming实时数据分析

我们在来跑一下

Spark Streaming实时数据分析

 再回到这边我们敲几个字母进去

Spark Streaming实时数据分析
Spark Streaming实时数据分析

 把同样的单词多次输入我们看看是什么结果

Spark Streaming实时数据分析

可以看到他会统计

Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析

我们启动一下spark-shell,发现报错了

Spark Streaming实时数据分析
Spark Streaming实时数据分析

 是因为我们前面配置了hive到spark sql 里面,我们先配回来(3个节点都修改)

Spark Streaming实时数据分析
Spark Streaming实时数据分析

再启动一下

Spark Streaming实时数据分析

我们输入代码

Spark Streaming实时数据分析
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@431f8830

scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@23f27434

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@2c3df478

scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@6d3dc0c5

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@8fa4647

scala> wordCounts.print()

scala>      

最后启动一下服务发现报错了

Spark Streaming实时数据分析
Spark Streaming实时数据分析

是因为没有启动nc

 现在把他启动

Spark Streaming实时数据分析

 我们敲进去一些数据

Spark Streaming实时数据分析
Spark Streaming实时数据分析

 退出再启动一次

Spark Streaming实时数据分析

 再次把代码放进来

Spark Streaming实时数据分析
Spark Streaming实时数据分析

 我们在nc那边输入数据

Spark Streaming实时数据分析

 回到这边看看结果

Spark Streaming实时数据分析

打开我们的idea

Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
package com.spark.test

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.{SparkConf, SparkContext}
object Test {

  def main(args: Array[String]): Unit = {
   val spark= SparkSession
       .builder
         .master("local[2]")
         .appName("HdfsTest")
           .getOrCreate()

    val ssc = new  StreamingContext(spark.sparkContext, Seconds(5));
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
  }
}      
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析
package com.spark.test

import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestStreaming {
  def main(args: Array[String]): Unit = {
    val spark= SparkSession.builder.master("local[2]")
      .appName("streaming").getOrCreate()

    val sc=spark.sparkContext;
    val ssc = new StreamingContext(sc, Seconds(5))
    val lines = ssc.socketTextStream("bigdata-pro01.kfk.com", 9999)
    //flatMap运算
    val words = lines.flatMap(_.split(" ")).map(words=>(words,1)).reduceByKey(_+_)
    words.print()
    //map reduce 计算
   // val wordCounts = words.map(x =>(x, 1)).reduceByKey(_ + _)
   // wordCounts.print()
    ssc.start()
    ssc.awaitTermination()

  }
}      
Spark Streaming实时数据分析
Spark Streaming实时数据分析
Spark Streaming实时数据分析

 这个过程呢要这样操作,先把程序运行,再启动nc,再到nc界面输入单词

Spark Streaming实时数据分析
Spark Streaming实时数据分析
package com.spark.test

import java.sql.DriverManager

import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestStreaming {
  def main(args: Array[String]): Unit = {
    val spark= SparkSession.builder.master("local[2]")
      .appName("streaming").getOrCreate()

    val sc=spark.sparkContext;
    val ssc = new StreamingContext(sc, Seconds(5))
    val lines = ssc.socketTextStream("bigdata-pro01.kfk.com", 9999)
    //flatMap运算
    val words = lines.flatMap(_.split(" ")).map(words=>(words,1)).reduceByKey(_+_)

    words.foreachRDD(rdd=>rdd.foreachPartition(line=>{
        Class.forName("com.mysql.jdbc.Driver")
      val conn= DriverManager.
        getConnection("jdbc:mysql://bigdata-pro01.kfk.com:3306/test","root","root")
      try {
        for(row <-line ) {
          val sql = "insert into webCount(titleName,count)values('" +row._1+ "',"+row._2+")"
          conn.prepareStatement(sql).executeUpdate()
        }
      }finally {
       conn.close()
      }

    }))


    words.print()
    //map reduce 计算
   // val wordCounts = words.map(x =>(x, 1)).reduceByKey(_ + _)
   // wordCounts.print()
    ssc.start()
    ssc.awaitTermination()

  }
}      
Spark Streaming实时数据分析
Spark Streaming实时数据分析

 我们把代码拷进来

Spark Streaming实时数据分析
Spark Streaming实时数据分析
import java.sql.DriverManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

 val sc=spark.sparkContext;
    val ssc = new StreamingContext(sc, Seconds(5))
    val lines = ssc.socketTextStream("bigdata-pro01.kfk.com", 9999)
    val words = lines.flatMap(_.split(" ")).map(words=>(words,1)).reduceByKey(_+_)
    words.foreachRDD(rdd=>rdd.foreachPartition(line=>{
        Class.forName("com.mysql.jdbc.Driver")
      val conn= DriverManager.
        getConnection("jdbc:mysql://bigdata-pro01.kfk.com:3306/test","root","root")
      try {
        for(row <-line ) {
          val sql = "insert into webCount(titleName,count)values('" +row._1+ "',"+row._2+")"
          conn.prepareStatement(sql).executeUpdate()
        }
      }finally {
       conn.close()
      }
    }))
    ssc.start()
    ssc.awaitTermination()      

 我们输入数据

Spark Streaming实时数据分析
Spark Streaming实时数据分析

继续阅读