提要
公司要做实现一套自有的栏目系统替换掉原来的栏目系统,原系统闭源,日活20w左右,需要支持诸如 频道统计,员工完播统计, 客户管理,素材管理,场次管理等功能,基本是复刻原系统营销外的主体功能。刚开始写的时候是2024年,在2025年项目已经完成。
需要解决的问题:
- 需要存储节目播放进度10S同步一次,且需要开场10分钟内实时更新(统计功能部分) 闭源系统原方案 堆数据库Mysql 4台 16核32G 给人看傻了。
- 推理复刻原系统逻辑(这个还好比较容易)
- 原系统不合理的业务架构 CRM系统和栏目系统本身强耦合,估计后期强行拆分,业务服务划分设计不当,导致两个系统需要相互同步,比如开播前的模版消息需要通过CRM配置和审核发送,场次Tag也需要CRM来确认同意生成。
思路(开始前):
对于第一个问题。原检索维度比较多,考虑是否为OLAP ES之类的数据源检索实现,但是分析后他能20s左右延迟同步当日开场播放进度,如果需要数据转换+填充应该比较麻烦实现,后来查看了闭源系统配置,基本是数据库拉满,光这个播放器部分就 4* 16核*32 , 不用怀疑应该是实时update。
实现和优化思路: 加redis hash存储结构用于承接两天内单频道播放进度的存储。live_room_staff=> user live_room_user=> duration 也可针对于单个播放进度Tag建立结构优化检索 对于频道完播Tag维度 可以使用 set 取全量和 tag交集计算人数。redis 10W hash key 全量获取大概只需要1-2S , 完全可以满足需求。然后利用队列异步刷写数据到Mysql,超过两天时间区间改使用Mysql检索。
业务分解过程:
理解运营的需求, 拆解原友米的业务构造,友米依赖企业微信插件作为关联管理用户关系的桥接,这边使用腾讯IM+APP的形式,这样能绕过企业微信的限制。对友米栏目这个归属层级进行弱化,实际业务中这个概念基本用不到,确定主要由 节目->场次 为最终最小用户记录统计单位。整个业务主要划分为3部分 节目部分 客服和用户体系架构 观看记录部分和统计部分。
项目拆分和实现
一、节目部分(直播 / 场次 / 素材 / 节目单)
1. 直播间 LiveRoom
– 增删改查、详情、开关播状态流转(LiveRoomController:create/update/delete/state/getDetail)
– 房间合并(liveRoomMerge)、关联员工/部门查询、关联员工/部门实体(LiveRoomRelationStaff/RelationDepartment)
– 房间分类 LiveRoomClassification 完整 CRUD
2. 直播场次 LiveSession
– 场次 CRUD、审核流转(待审/通过/拒绝,approveSession/getPendingList)
– 定时任务:开播前自动审核(autoApproveSessionBeforeStart)、结束后自动改状态(autoUpdateSessionStateAfterEnd)
– 关联游离场次到房间、清除场次观看数据(resetViewData)
3. 直播事件 LiveEvent
– 事件 CRUD、关联游离事件到场次/房间
4. 素材管理 Material
– 素材分类 CRUD + 批量排序(MaterialClassification:batchUpdateSort)
– 素材 CRUD、CDN 上传、首帧提取(uploadImageToCdn/getFirstFrame)、分类带素材聚合返回
5. 素材问答 MaterialQuestion
– 后台查看/更新作答记录(LiveMaterialQuestionController:getRecord/updateRecord)
6. 节目单 PlayBill
– 节目单 CRUD、节目单下挂素材记录(PlayBillRecord)增删改查、索引追加、续期刷新(renewPlayBill)
7. 用户侧(C 端,controller/api/v2/live/)
– 用户房间绑定检查、获取 CDN 播放地址、直播间/场次信息、设置观看日期、完成观看判定(UserLiveRoomController)
– 用户问答列表、提交作答、完成判定(UserLiveQuestionController)
—
二、客服和用户体系架构
1. 客户/客服归属与组织
– 客服(员工)CRUD、组织部门树管理(OrganizationServices)
– 客户-客服归属建立与校验、客户整批转移(含 IM 好友迁移,transferCustomerRelations)
– 客户绑定/切换直播频道(单个、批量、按客服名下批量)
– 附加备注/标签、等级标签(CustomerStaffAttach/StaffTag/StaffSetCustomerLevelTag)
– 客户动态时间线(绑定/换频道日志)、客户列表/导出 web
2. 员工端 staff_client(route/staff.php)
– 员工登录、名下直播间、名下客户分页(区分短/长期)
– 用户观看数据、客户详情、客户答题明细
– 单条/批量改备注、按员工查学分/活跃分
– 标签体系:等级标签、自定义标签 CRUD、批量打标签、按标签筛人
– 客户时间线、待办增删查、主动发消息与邀请发送(SendMessageController)
3. 栏目推送系统 lanmu(客服→用户触达核心)
– 推送模板(文本/图/视频/自定义 + 分类树 + 预览)
– 策略与标签、按素材取规则并合并默认规则(mergeRules,支持「直播前/后/结束/次日」时间计算)
– 任务创建(绑定直播场次)、按员工拆 TaskItem 批量发送、失败统计与重试、自动确认、配置项
– 员工企微绑定关系管理、登录码生成/校验/下载/核销、待办推送、直播场次自动邀约
4. 客服工作台 / 腾讯 IM
– 会话列表、聊天记录、转接客服、客服登录(账号/扫码/微信)
– IM 账号导入、资料同步、加/删好友、改备注、单聊/批量发消息(sendSingleChatBatchMessage)
– 好友/资料同步事件处理与绑定变更同步(ImSyncService)
5. 企业微信对接 wework
– 多企业管理、事件/部门同步、员工同步/激活
– 企微客户拉取/详情/删除同步、按员工查客户
– 企微标签组、部门树、订单与激活码、员工激活信息同步
6. 用户体系 user
– 资料、地址、发票、反馈、搜索/访问记录、好友、充值、提现、佣金(含冻结)、积分、签到、卡券
– 分组、标签与分类、会员等级、用户-直播间关系、批量处理、注销
—
三、观看记录与统计部分
1. 观看记录采集(C 端写入)
– 进入场次即建 UserLiveSession 记录、绑定直播间落库
– 播放进度分档上报:0–25/25–50/50–75/75+ 更新
tag,推进完成态、在线/离线、参与度(heartUpdateRecord/userSessionUpdate)
– 问卷作答落库 LiveSessionUserQuestion、素材播放完成/活跃日期回传
2. 观看记录查看(后台)
– 用户观看历史(getUserViewHistory)、直播间用户列表、场次观看数据重置
– 节目单记录、素材记录管理
3. 观看记录查看(员工端)
– 用户观看数据(按 uid+直播间+时间区间)、客户详情(含活跃分折算)、客户答题明细
– 客户学分列表(关联 live_user_credit,多条件过滤)、备注维护
4. 素材问答作答记录 / 节目单记录
– MaterialQuestionRecord(素材维度)与 LiveSessionUserQuestion(会话维度)两套并行结构,后台均可查
核心点
观看记录部分,因为涉及到统计,客服关系,部分关系,频道关系等一系列维度,放弃使用上面redis组织sql刷盘,redis只承载最新记录和观看进度的更新,定期实施刷盘策略,由观看记录连表来展示实际的统计需求,并且按照观看时间按月分片观看记录。由于实际业务10w用户级别暂时没有出现性能问题(之间踩过一些坑)。这部分在crmeb框架的基础上开发。后续PHP版本在更新和在观看记录预生成部分由于thinkphp-queue的问题,和php端处理效率问题。改用go-kratos框架来实现使用asynq队列,以解决问题。
问题解析
https://github.com/top-think/think-queue 观看进度下游消费不及时导致的严重占用带宽问题PHP生产端投递太多,消费端消费效率不够。pop实现的时候每次会尝试移动延迟任务, 到时移动延迟任务的时候,由于消息积压非常严重会导致每个线程都尝试移动所有积压队列。表现为iftop抓包了,redis那边青鸟几百M,就是redis 返回的数据特别大,带宽直接被打满。这个组件实现非常古早,而且实测性能很差,纯粹是热点观看时间进来过多消息,导致积压问题不断放大,只能做200/S这种平缓投递。
ingress路由分离下俩部分到go服务。
一、生成观看记录(用户会话预生成)
触发时机:直播场次审核通过时(liveroom_approve.go:351),为该直播间所有关联用户预生成 EbLiveUserSession 记录。
入队流程 (biz/liveroom_approve.go:473 enqueueLiveSessionGenTask)
– 查询直播间关联用户 → 按 1000 人/批 分批
– 每批构造 UserLiveSessionGenPayload,经 Producer.EnqueueLiveSessionGenTask(asynq 异步任务队列)投递
– 入队失败有重试(3 次,200ms 间隔,10s 上限)
消费流程 (service/task/livesessiongen/livesessiongen.processor.go:40 ProcessTask)
1. Redis 去重:用 HSET livesessiongen_dedup:{sessionId} 过滤已生成的用户(Redis 失败则降级靠 DB 唯一约束)
2. 批量插入:EbLiveUserSession.CreateBulk,初始值:Tag=not_view、PlayDuration/TotalDuration/State/OnlineState/Partic
ipateState/Times=0、FirstPlayTime=场次开始时间、LastUpdateScene=go-gen
3. 写去重标记(HSET,24h 过期)→ 预热会话缓存(140s TTL)
4. 全程埋点性能日志(队列等待时间、插入耗时、处理速率)
设计要点:场次开始前就把所有用户记录建好,后续进度更新只需改值,避免开场高并发写入。
—
二、更新观看进度
入口:service/liveroom.go:28 CompleteLiveSession → biz/liveroom.go:171
核心逻辑:
1. 定位会话:FindLiveSessionRecordById 取场次(含 Length 视频时长)
2. 读取当前记录:先查 Redis(getRecordCacheKey),miss 则查 DB(EbLiveUserSession),都没有返回
3. 计算进度:playedPercentage = playDuration / length × 100
4. 字段更新规则(只升不降的设计):
– Tag:按百分比分档 level1(0–25%)→ level2(25–50%)→ level3(50–75%)→ level4(75%+),经 getUpperTag 只取更高档
– State:99–100% 置 STATE_COMPLETE
– TotalDuration:仅在大于旧值时更新
– OnlineState:离线→在线;ParticipateState:初始→参与;FirstPlayTime:空则补当前时间
5. 落库判断:needCompleteUpdate(档位/状态变化)或 Event=close|finish 或测试模式 → 更新 DB
– 关键:用 context.Background() 干净 context 落库,仅透传 trace_id,避免 ctx 中残留事务污染(ent.TxFromContext
多处检测)
– LastUpdateScene=complate_update
– 达到 level4(75%+)额外发积分 + 学分(saveLiveUserIntegral / saveLiveUserCredit,同样用干净 ctx)
6. 心跳异步同步:抢 HeartbeatLockKey 锁成功 → 投递延迟 220s 的心跳任务(EnqueueHeartUpdateTask),由 ReflushData
把缓存数据最终刷回 DB

