generic-pool v2 数据库连接池源码分析

Nodejs cyanprobe 7年前 (2017-02-02) 6037次浏览 已收录 7个评论

前言:

年前在北京往回跑的时候,笔兄又在问连接池的问题,尼玛我怎么会。连接池要不要释放,不释放又会如何。年后闲着没事看了generic-pool的源码,原本准备打印出来看,结果打印店房间了就用pad看,然后竟然看到了js写的链表,为了降低理解难度(毕竟是菜逼)看了version 2一个单文件完成的降级版本。generic-pool: https://github.com/coopernurse/node-pool

实现流程:

为了帮助理解先把代码注释放一下,天生英语残,看英文解释看不懂,看代码结合草根英语翻译…

  // defaults
 //一个未使用链接的最大存活时间
 factory.idleTimeoutMillis = factory.idleTimeoutMillis || 30000
 //如果为true最近释放的链接将允许被再次分配
 factory.returnToHead = factory.returnToHead || false
 //空闲资源在或低于最小阈值时应被销毁/重新创建。默认 true
 factory.refreshIdle = ('refreshIdle' in factory) ? factory.refreshIdle : true
 //检查连接是否超时时间
 factory.reapInterval = factory.reapIntervalMillis || 1000
 //优先级1-x数值越小优先值越高,这里是队列限制值,用于初始化
 factory.priorityRange = factory.priorityRange || 1
/* 一个函数,如果连接池资源是正常的返回函数return true 如果资源不正常将会被销毁。这里应该写逻辑 arguments 传入连接对象,使用者判断是否正常,如果
不正常返回false,此连接将会被销毁。
 这货在acquire()中被调用,在从连接池获得资源之前。*/
 factory.validate = factory.validate || function () { return true }
//最大最小连接数
 factory.max = parseInt(factory.max, 10)
 factory.min = parseInt(factory.min, 10)
 this._factory = factory
//使用的对象数组
 this._inUseObjects = []
 this._draining = false
 //初始化队列
 this._waitingClients = new PriorityQueue(factory.priorityRange)
 //可获取的链接
 this._availableObjects = []
 this._asyncTestObjects = []
 //活跃连接数
 this._count = 0
 this._removeIdleTimer = null
 this._removeIdleScheduled = false

实现优先级队列:

用户通过priorityRange设置优先级区段,version3应该是实现了一个链表结构(等我弄清楚的..),这里通过PriorityQueue类创建一个数组,数组长度为priorityRange设定值。初始化完成这个样子Array[[],[],[]] 此时我们拥有了3个优先级,当我们调用pool.acquire时可以设定优先,默认为0。也就是说在没有设置优先值的前提下我们只会用到第一个数组。

连接池补充(初始化):

_ensureMinimum 来实现连接池初始化,如果小于min值,通过调用_createResource函数来补充连接至最小值。creatResource :将connection压入inUseObjects[],同时调用addResourceToAvailableObjects给连接对象封装成objWithTimeout对象,增加timeout属性 例- objWithTimeout = {obj: obj,timeout:timeout}  并压入availableObjects[] 。紧接着调用dispense函数,因为初始化过程中队列不存在对象,直接return。到此初始化完成。

使用连接池调用过程:

Pool.prototype.acquire = function acquire (callback, priority) {
 if (this._draining) {
 throw new Error('pool is draining and cannot accept work')
 }
 if (process.domain) {
 callback = process.domain.bind(callback)
 }
//1
 this._waitingClients.enqueue(callback, priority)
//2
 this._dispense()
 return (this._count < this._factory.max)
}

这里的callback就是acquire里的query,重要的就1,2。先看1.

PriorityQueue.prototype.enqueue = function enqueue (obj, priority) {
var priorityOrig
// Convert to integer with a default value of 0.
priority = priority && +priority | 0 || 0
// Clear cache for total.
this._total = null
if (priority) {
priorityOrig = priority
if (priority < 0 || priority >= this._size) {
priority = (this._size - 1)
// put obj at the end of the line
console.error('invalid priority: ' + priorityOrig + ' must be between 0 and ' + priority)
}
}
this._slots[priority].push(obj)
}

 
从入队源码可以看出实现优先级应该怎么办,没错就是分配。来看2。

while (this._availableObjects.length > 0) {
this._log('dispense() - reusing obj', 'verbose')
objWithTimeout = this._availableObjects[0]
//不合法处理,弱返回true执行销毁
if (!this._factory.validate(objWithTimeout.obj)) {
this.destroy(objWithTimeout.obj)
continue
}
//available出栈
this._availableObjects.shift()
//inUse入栈
this._inUseObjects.push(objWithTimeout.obj)
//链接对象出队
clientCb = this._waitingClients.dequeue()
return clientCb(null, objWithTimeout.obj)
}
//继续补充连接
if (this._count < this._factory.max) {
this._createResource()
}

注意出队,也是优先级实现的一部分,slots中越往后的越慢出栈。代码如下:

PriorityQueue.prototype.dequeue = function dequeue (callback) {
 var obj = null
 // Clear cache for total.
 this._total = null
 for (var i = 0, sl = this._slots.length; i < sl; i += 1) {
 //如果队列存在则出栈
 if (this._slots[i].length) {
 obj = this._slots[i].shift()
 break
 }
 }
 return obj
}

去除过期失效的链接

_scheduleRemoveIdle函数在addResourceToAvailableObjects中被调用,去除(执行destroy)完成后,若可用连接对象不为0则继续按时检查可用连接对象是否过期。

连接复用问题:

从availableObj到inUseObj的队列中,在执行pool.release(client)之后,连接被释放,这里的代码是这样的。

 Pool.prototype.release = function release (obj) {
 // check to see if this object has already been released (i.e., is back in the pool of this._availableObjects)
 if (this._availableObjects.some(function (objWithTimeout) { return (objWithTimeout.obj === obj) })) {
 this._log('release called twice for the same resource: ' + (new Error().stack), 'error')
 return
 }
 // check to see if this object exists in the `in use` list and remove it
 var index = this._inUseObjects.indexOf(obj)
 if (index < 0) {
 this._log('attempt to release an invalid resource: ' + (new Error().stack), 'error')
 return
 }
 // this._log("return to pool")
 this._inUseObjects.splice(index, 1)
 this._addResourceToAvailableObjects(obj)
}

 
那么问题来了,在第一个判断不断的抛出错误,看释放的链接是否在可用连接队列里。这里一直返回true,问题就是这样,暂没有找到作者为毛这样写的依据(已解决,我是智障。。)。连接的复用其实就是从inUseObj搬家到availableObj的过程。看了源码之后感觉这里有个问题。复用过程中连接是否超过生存期的问题,如果再次复用,会重新设定时间戳,其实这个连接已经活了很久了。这是一个通用连接池,所以关于连接是否有效还需要拿出处理,这不在讨论范围内。这是一个比较简单的demo,有空version3看一看。

后记:

有很多很多基础问题木有解决。!! 先各种做一些小实验压压惊。。通过分析这个数据库连接池源码,搞懂了通用连接池内部是怎么实现功能的。写可能还不行,今后半年咸鱼时间都用来看模块熟悉大神们的笔迹吧哈哈。再过几天去北京了,献给我一个寒假最后一天可以自己支配的安静的下午,晚安~
 
 
 
 


CyanProbe , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:generic-pool v2 数据库连接池源码分析
喜欢 (1)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(7)个小伙伴在吐槽
  1. 不是代码牛表示看不懂,不过“打印店房间了”应该是放假了吧?
    姜辰2017-02-02 20:43 回复
  2. 看不懂啊。。
    2017-02-04 11:47 回复
  3. 以后会常来逛逛,博客很棒。
    Feeey个人博客2017-02-11 07:32 回复
  4. 阿西吧,我看不懂!老司机带带我吧 :lol:
    重庆游戏人2017-02-16 11:35 回复
  5. 博主最近没怎么更新啊,我来转转。
    Feeey个人博客2017-02-28 01:37 回复
  6. 大王叫我来巡山,我把博客转一转。(*^__^*) 嘻嘻……
    Feeey个人博客2017-03-12 00:36 回复