第一次写模块-fxqueue

Nodejs cyanprobe 7年前 (2017-03-24) 3697次浏览 已收录 1个评论

前言:

话说这是一枚Teambition的面试+分题,闲(菜)的没事干,就拿过来撸了,一开始什么概念都没有,果断下载Kue,看了Kue的源码和实现。难道我要写高仿Kue咯 :smile: 。不得不叹服人家对redis的把弄和设计,redis队列状态流程借鉴了kue。wujohns 司机问为什么起名为fxqueue,简单来说,不想让大家知道我抄的像kue哈哈。

   传送门:https://github.com/fangker/fxqueue

正文部分:

其实木有什么好说的,之前博文很多关于阻塞列表的方法都有用到。先说下队列状态变化吧。延时-》准备-》活跃-》成功/失败 。在redis中对应 delay,inactive,active,complete/fail。
先扯delay到inactive状态的转变,fxqueue用一个定时器通过’warlock’这个模块锁定保证同时只有一个终端在运行这个任务。不断检测是否超过延期时间超过后改变状态。(PS:写着写着想起个bug shutdown木有把那个定时器给销毁。。。)
inactive到active转变,当任务信息被读取,状态改变。
active到fail:  1.fxqueue用一个定时器通过’warlock’这个模块锁定,不断检测任务ttl是否超过限定时间(被读取任务却没有完成),如果超过限定,状态改变。2.终端job.done()主动传入一个Error对象,状态改变。
active 到complete :执行job.done(),状态改变。
重试则是在fail之后根据参数回到延时或准备状态。
关于任务事件监听:其实是将job对象加入到map中,通过pub/sub ,检索map并触发job对象队列事件。

定时任务

当然,为了抄的不像Kue,我加入了事件键通知,之前博文也扯到。

监听定时任务

queue.ontime('type');    //1
queue.ontime({type:'type',only:true});   //2
//ex:let schemajob=queue.ontime({type:'sendMessage',only:true})

使用.ontime()监听到期通知。当定时通知运行时将返回<promise>对象,可获得id,data。
当使用1方式时,默认消息将被广播所有调用.ontime()方法监听此类型的node终端,都将返回信息。
当使用2方式时,只有正在运行的此终端可以返回信息,其他终端将进入待命状态,当此终端崩溃或者销毁后,其他某个终端将会接替此终端任务,保证此定时任务通知只会触发一个终端使用。
注意: 使用2方式时,可以保证在终端崩溃与下一个终端接入的时间段内触发的定时通知不会丢失。(解决方法会导致产生检测+redis锁时间段内的延迟)

问题在于2方式,ttl过期是把信息广播到所有终端,做法就是先用warlock锁定任务类型为单独触发,其他终端能接收却因为锁存在无法创建本地ob即无法触发。所有终端只有1个终端有ob所以目标达成。
如果终端崩溃,redis TTL在此期间过期的任务将会丢失。我是这样做的。创建键的同时建立一个hash储存信息,并建立定时器监听过期,根据comfirm状态,判断:为正在被处理/过期/丢失 ,如果丢失,则根据hash字段重新建立。schema得到消息就改变确认状态,这样就解决了问题。过期是不可恢复的,而因为奔溃所导致的通知未被终端确认的信息将会被恢复。考虑到这种通知一般是大量的,所以一切失败,提醒的数据将不被纪录。fxqueue只管消息被终端收到,不管终端是否处理成功(也可以保留失败数据,但感觉这种定时通知没有必要,因为不会再处理。超时或者未done处理的都会被默认清理)。
 


CyanProbe , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:第一次写模块-fxqueue
喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(1)个小伙伴在吐槽
  1. 加油~
    姜辰2017-03-27 00:33 回复