Skip to content
Snippets Groups Projects
Commit 1dc8d53cf87a authored by INADA Naoki's avatar INADA Naoki
Browse files

pool: Add max_open_connections option

parent 48fe75894674
Branches
No related tags found
No related merge requests found
......@@ -2,8 +2,7 @@
from __future__ import absolute_import, division, print_function
from collections import deque
import sys
import warnings
from tornado.ioloop import IOLoop
from tornado.gen import coroutine, Return
......@@ -6,6 +5,7 @@
import warnings
from tornado.ioloop import IOLoop
from tornado.gen import coroutine, Return
from tornado.concurrent import Future
from tornado.concurrent import Future, chain_future
from tornado_mysql import connect
......@@ -11,4 +11,5 @@
from tornado_mysql import connect
from tornado_mysql.connections import Connection
DEBUG = False
......@@ -32,9 +33,10 @@
connect_kwargs,
max_idle_connections=1,
max_recycle_sec=3600,
max_open_connections=0,
io_loop=None,
):
"""
:param dict connect_kwargs: kwargs for tornado_mysql.connect()
:param int max_idle_connections: Max number of keeping connections.
:param int max_recycle_sec: How long connections are recycled.
......@@ -35,10 +37,12 @@
io_loop=None,
):
"""
:param dict connect_kwargs: kwargs for tornado_mysql.connect()
:param int max_idle_connections: Max number of keeping connections.
:param int max_recycle_sec: How long connections are recycled.
:param int max_open_connections:
Max number of opened connections. 0 means no limit.
"""
connect_kwargs['autocommit'] = True
self.io_loop = io_loop or IOLoop.current()
self.connect_kwargs = connect_kwargs
......@@ -41,9 +45,10 @@
"""
connect_kwargs['autocommit'] = True
self.io_loop = io_loop or IOLoop.current()
self.connect_kwargs = connect_kwargs
self.max_idle_connections = max_idle_connections
self.max_idle = max_idle_connections
self.max_open = max_open_connections
self.max_recycle_sec = max_recycle_sec
self._opened_conns = 0
self._free_conn = deque()
......@@ -46,7 +51,11 @@
self.max_recycle_sec = max_recycle_sec
self._opened_conns = 0
self._free_conn = deque()
self._waitings = deque()
def stat(self):
return (self._opened_conns, len(self._free_conn), len(self._waitings))
def _get_conn(self):
now = self.io_loop.time()
......@@ -50,8 +59,10 @@
def _get_conn(self):
now = self.io_loop.time()
# Try to reuse in free pool
while self._free_conn:
conn = self._free_conn.popleft()
if now - conn.connected_time > self.max_recycle_sec:
self._close_async(conn)
continue
......@@ -53,10 +64,10 @@
while self._free_conn:
conn = self._free_conn.popleft()
if now - conn.connected_time > self.max_recycle_sec:
self._close_async(conn)
continue
_debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,))
_debug("Reusing connection from pool:", self.stat())
fut = Future()
fut.set_result(conn)
return fut
......@@ -59,5 +70,7 @@
fut = Future()
fut.set_result(conn)
return fut
# Open new connection
if self.max_open and self._opened_conns < self.max_open:
self._opened_conns += 1
......@@ -63,4 +76,4 @@
self._opened_conns += 1
_debug("Creating new connection (opened=%d)" % (self._opened_conns,))
_debug("Creating new connection:", self.stat())
return connect(**self.connect_kwargs)
......@@ -65,3 +78,8 @@
return connect(**self.connect_kwargs)
# Wait to other connection is released.
fut = Future()
self._waitings.append(fut)
return fut
def _put_conn(self, conn):
......@@ -67,3 +85,3 @@
def _put_conn(self, conn):
if (len(self._free_conn) < self.max_idle_connections and
if (len(self._free_conn) < self.max_idle and
self.io_loop.time() - conn.connected_time < self.max_recycle_sec):
......@@ -69,2 +87,7 @@
self.io_loop.time() - conn.connected_time < self.max_recycle_sec):
if self._waitings:
fut = self._waitings.popleft()
fut.set_result(conn)
_debug("Passing returned connection to waiter:", self.stat())
else:
self._free_conn.append(conn)
......@@ -70,5 +93,6 @@
self._free_conn.append(conn)
_debug("Add conn to free pool:", self.stat())
else:
self._close_async(conn)
def _close_async(self, conn):
......@@ -71,9 +95,8 @@
else:
self._close_async(conn)
def _close_async(self, conn):
self.io_loop.add_future(conn.close_async(), callback=lambda f: None)
self._opened_conns -= 1
self.io_loop.add_future(conn.close_async(), callback=self._after_close)
def _close_conn(self, conn):
conn.close()
......@@ -77,4 +100,13 @@
def _close_conn(self, conn):
conn.close()
self._after_close()
def _after_close(self, fut=None):
if self._waitings:
fut = self._waitings.popleft()
conn = Connection(**self.connect_kwargs)
cf = conn.connect()
self.io_loop.add_future(cf, callback=lambda f: fut.set_result(conn))
else:
self._opened_conns -= 1
......@@ -80,4 +112,5 @@
self._opened_conns -= 1
_debug("Connection closed:", self.stat())
@coroutine
def execute(self, query, params=None):
......@@ -94,4 +127,8 @@
cur = conn.cursor()
yield cur.execute(query, params)
yield cur.close()
except:
self._close_conn(conn)
raise
else:
self._put_conn(conn)
......@@ -97,8 +134,4 @@
self._put_conn(conn)
except:
self._opened_conns -= 1
conn.close()
raise
raise Return(cur)
@coroutine
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment