實時統(tǒng)計每天pv,uv的sparkStreaming結合redis結果存入mysql供前端展示
最近有個需求,實時統(tǒng)計pv,uv
,結果按照date,hour,pv,uv
來展示,按天統(tǒng)計,第二天重新統(tǒng)計,當然了實際還需要按照類型字段分類統(tǒng)計pv,uv
,比如按照date,hour,pv,uv,type
來展示。這里介紹最基本的pv,uv
的展示。
1、項目流程
日志數(shù)據(jù)從flume
采集過來,落到hdfs
供其它離線業(yè)務使用,也會sink
到kafka
,sparkStreaming
從kafka
拉數(shù)據(jù)過來,計算pv,uv
,uv
是用的redis
的set
集合去重,最后把結果寫入mysql
數(shù)據(jù)庫,供前端展示使用。
2、具體過程
1)pv
的計算
拉取數(shù)據(jù)有兩種方式,基于received
和direct
方式,這里用direct
直拉的方式,用的mapWithState
算子保存狀態(tài),這個算子與updateStateByKey
一樣,并且性能更好。當然了實際中數(shù)據(jù)過來需要經(jīng)過清洗,過濾,才能使用。
定義一個狀態(tài)函數(shù)
這樣就很容易的把pv
計算出來了。
2)uv的計算
uv是要全天去重的,每次進來一個batch
的數(shù)據(jù),如果用原生的reduceByKey
或者groupByKey
對配置要求太高,在配置較低情況下,我們申請了一個93G
的redis
用來去重,原理是每進來一條數(shù)據(jù),將date
作為key
,guid
加入set
集合,20
秒刷新一次,也就是將set
集合的尺寸取出來,更新一下數(shù)據(jù)庫即可。
redis連接池代碼RedisPoolUtil.scala
:
3)結果保存到數(shù)據(jù)庫
結果保存到mysql
,數(shù)據(jù)庫,20
秒刷新一次數(shù)據(jù)庫,前端展示刷新一次,就會重新查詢一次數(shù)據(jù)庫,做到實時統(tǒng)計展示pv,uv
的目的。
msql 連接池代碼MysqlPoolUtil.scala
4)數(shù)據(jù)容錯
流處理消費kafka都會考慮到數(shù)據(jù)丟失問題,一般可以保存到任何存儲系統(tǒng),包括mysql,hdfs,hbase,redis,zookeeper
等到。這里用SparkStreaming
自帶的checkpoint
機制來實現(xiàn)應用重啟時數(shù)據(jù)恢復。
checkpoint
這里采用的是checkpoint
機制,在重啟或者失敗后重啟可以直接讀取上次沒有完成的任務,從kafka
對應offset
讀取數(shù)據(jù)。
checkpoint是每天一個目錄,在第二天凌晨定時銷毀StreamingContext對象,重新統(tǒng)計計算pv,uv。
注意
ssc.stop(false,true)
表示優(yōu)雅地銷毀StreamingContext
對象,不能銷毀SparkContext
對象,ssc.stop(true,true)
會停掉SparkContext
對象,程序就直接停了。
應用遷移或者程序升級
在這個過程中,我們把應用升級了一下,比如說某個功能寫的不夠完善,或者有邏輯錯誤,這時候都是需要修改代碼,重新打jar包的,這時候如果把程序停了,新的應用還是會讀取老的checkpoint
,可能會有兩個問題:
- 執(zhí)行的還是上一次的程序,因為
checkpoint
里面也有序列化的代碼;- 直接執(zhí)行失敗,反序列化失?。?/li>
其實有時候,修改代碼后不用刪除checkpoint
也是可以直接生效,經(jīng)過很多測試,我發(fā)現(xiàn)如果對數(shù)據(jù)的過濾操作導致數(shù)據(jù)過濾邏輯改變,還有狀態(tài)操作保存修改,也會導致重啟失敗,只有刪除checkpoint才行,可是實際中一旦刪除checkpoint
,就會導致上一次未完成的任務和消費kafka
的offset
丟失,直接導致數(shù)據(jù)丟失,這種情況下我一般這么做。
這種情況一般是在另外一個集群,或者把
checkpoint
目錄修改下,我們是代碼與配置文件分離,所以修改配置文件checkpoint
的位置還是很方便的。然后兩個程序一起跑,除了checkpoint
目錄不一樣,會重新建,都插入同一個數(shù)據(jù)庫,跑一段時間后,把舊的程序停掉就好。以前看官網(wǎng)這么說,只能記住不能清楚明了,只有自己做時才會想一下辦法去保證數(shù)據(jù)準確。
5)日志
日志用的log4j2
,本地保存一份,ERROR
級別的日志會通過郵件發(fā)送到郵箱。
3、主要代碼
需要的maven
依賴:
讀取配置文件代碼ConfigFactory .java
:
主要業(yè)務代碼,如下:
package com.js.ipflow.flash.helper
import java.sql.Connection
import java.util.{Calendar, Date}
import com.alibaba.fastjson.JSON
import com.js.ipflow.start.ConfigFactory
import com.js.ipflow.utils.{DateUtil, MysqlPoolUtil, RedisPoolUtil}
import kafka.serializer.StringDecoder
import org.apache.logging.log4j.LogManager
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis
object HelperHandle {
val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)
// 郵件level=error日志
val logger2 = LogManager.getLogger("email")
def main(args: Array[String]): Unit = {
helperHandle(args(0))
}
def helperHandle(consumeRate: String): Unit = {
// 初始化配置文件
ConfigFactory.initConfig()
val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.kafka.maxRatePerPartition", consumeRate)
conf.set("spark.default.parallelism", "30")
val sc = new SparkContext(conf)
while (true) {
val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0), getStreamingContext _)
ssc.start()
ssc.awaitTerminationOrTimeout(resetTime)
ssc.stop(false, true)
}
def getStreamingContext(): StreamingContext = {
val stateSpec = StateSpec.function(mapFunction)
val ssc = new StreamingContext(sc, Seconds(ConfigFactory.sparkstreamseconds))
ssc.checkpoint(ConfigFactory.checkpointdir + DateUtil.getDay(0))
val zkQuorm = ConfigFactory.kafkazookeeper
val topics = ConfigFactory.kafkatopic
val topicSet = Set(topics)
val kafkaParams = Map[String, String](
"metadata.broker.list" -> (ConfigFactory.kafkaipport)
, "group.id" -> (ConfigFactory.kafkagroupid)
, "auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString
)
val rmessage = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
)
// helper數(shù)據(jù) (dateHour,guid,helperversion)
val helper_data = FilterHelper.getHelperData(rmessage.map(x => {
val message = JSON.parseObject(x._2).getString("message")
JSON.parseObject(message)
})).repartition(60).cache()
// (guid, datehour + helperversion)
val helper_data_dis = helper_data.map(x => (x._2, addTab(x._1) + x._3)).reduceByKey((x, y) => y)
// pv,uv
val helper_count = helper_data.map(x => (x._1, 1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
// helperversion
val helper_helperversion_count = helper_data.map(x => (addTab(x._1) + x._3, 1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
helper_data_dis.foreachRDD(rdd => {
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
try {
jedis = getJedis
eachPartition.foreach(x => {
val arr = x._2.split("\t")
val date: String = arr(0).split(":")(0)
// helper 統(tǒng)計
val key0 = "helper_" + date
jedis.sadd(key0, x._1)
jedis.expire(key0, ConfigFactory.rediskeyexists)
// helperversion 統(tǒng)計
val key = date + "_" + arr(1)
jedis.sadd(key, x._1)
jedis.expire(key, ConfigFactory.rediskeyexists)
})
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
}
})
})
insertHelper(helper_helperversion_count, "statistic_realtime_flash_helper", "date", "hour", "count_all", "count", "helperversion", "datehour", "dh")
insertHelper(helper_count, "statistic_realtime_helper_count", "date", "hour", "helper_count_all", "helper_count", "dh")
ssc
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 計算當前時間距離次日零點的時長(毫秒)
def resetTime = {
val now = new Date()
val todayEnd = Calendar.getInstance
todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小時制
todayEnd.set(Calendar.MINUTE, 59)
todayEnd.set(Calendar.SECOND, 59)
todayEnd.set(Calendar.MILLISECOND, 999)
todayEnd.getTimeInMillis - now.getTime
}
/**
* 插入數(shù)據(jù)
*
* @param data (addTab(datehour)+helperversion)
* @param tbName
* @param colNames
*/
def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {
data.foreachRDD(rdd => {
val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt)
if (!rdd.isEmpty()) {
val hour_now = tmp_rdd.max() // 獲取當前結果中最大的時間,在數(shù)據(jù)恢復中可以起作用
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
var conn: Connection = null
try {
jedis = getJedis
conn = MysqlPoolUtil.getConnection()
conn.setAutoCommit(false)
val stmt = conn.createStatement()
eachPartition.foreach(x => {
if (colNames.length == 7) {
val datehour = x._1.split("\t")(0)
val helperversion = x._1.split("\t")(1)
val date_hour = datehour.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // count_all
val colName3 = colNames(3) // count
val colName4 = colNames(4) // helperversion
val colName5 = colNames(5) // datehour
val colName6 = colNames(6) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22
val colValue4 = addYin(helperversion)
var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
val colValue6 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只對現(xiàn)在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
} /* else {
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2}"
}*/
} else if (colNames.length == 5) {
val date_hour = x._1.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // helper_count_all
val colName3 = colNames(3) // helper_count
val colName4 = colNames(4) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard("helper_" + date) // // helper_2018-07-08
val colValue4 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只對現(xiàn)在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
}
}
})
stmt.executeBatch() // 批量執(zhí)行sql語句
conn.commit()
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
if(conn != null){
conn.close()
}
}
})
}
})
}
def addYin(str: String): String = {
"'" + str + "'"
}
// 字符串添加tab格式化方法
def addTab(str: String): String = {
str + "\t";
}
// 實時流量狀態(tài)更新函數(shù)
val mapFunction = (datehour: String, pv: Option[Long], state: State[Long]) => {
val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)
val output = (datehour, accuSum)
state.update(accuSum)
output
}
// 獲取jedis連接
def getJedis: Jedis = {
val jedis = RedisPoolUtil.getPool.getResource
jedis
}
// 釋放jedis連接
def closeJedis(jedis: Jedis): Unit = {
RedisPoolUtil.getPool.returnResource(jedis)
}
}
作者:柯廣的網(wǎng)絡日志
微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫