Skip to content

simplify pool recycle logic #2985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sqlalchemy-bot opened this issue Mar 5, 2014 · 5 comments
Closed

simplify pool recycle logic #2985

sqlalchemy-bot opened this issue Mar 5, 2014 · 5 comments
Labels
bug Something isn't working high priority
Milestone

Comments

@sqlalchemy-bot
Copy link
Collaborator

Migrated issue, originally created by Michael Bayer (@zzzeek)

using a simple invalidation time we can do away with all the "pool replacement" logic. the current logic is subject to a pretty obvious race condition, where as many connections all hit a disconnect wall, all of the Connection objects hosting them will simultaneously call upon self.engine.dispose(). this means we could have N pools generated and immediately chucked within a disconnect cycle.

the patch below removes all of that and replaces with a simple timeout which incurs no overhead and no race conditions. the only difference is that the "bad" connections hang around until they are invalidated on checkout.

diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 888a15f..20b5227 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -1084,9 +1084,7 @@ class Connection(Connectable):
                 del self._is_disconnect
                 dbapi_conn_wrapper = self.connection
                 self.invalidate(e)
-                if not hasattr(dbapi_conn_wrapper, '_pool') or \
-                        dbapi_conn_wrapper._pool is self.engine.pool:
-                    self.engine.dispose()
+                self.engine.pool._invalidate(dbapi_conn_wrapper)
             if self.should_close_with_result:
                 self.close()
 
@@ -1496,7 +1494,7 @@ class Engine(Connectable, log.Identified):
         the engine are not affected.
 
         """
-        self.pool = self.pool._replace()
+        self.pool.dispose()
 
     def _execute_default(self, default):
         with self.contextual_connect() as conn:
diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py
index 473b665..4a07e78 100644
--- a/lib/sqlalchemy/orm/strategies.py
+++ b/lib/sqlalchemy/orm/strategies.py
@@ -528,7 +528,6 @@ class LazyLoader(AbstractRelationshipLoader):
     def _emit_lazyload(self, strategy_options, session, state, ident_key, passive):
         q = session.query(self.mapper)._adapt_all_clauses()
 
-
         if self.parent_property.secondary is not None:
             q = q.select_from(self.mapper, self.parent_property.secondary)
 
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index af9b8fc..f78825e 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -210,6 +210,7 @@ class Pool(log.Identified):
         self._threadconns = threading.local()
         self._creator = creator
         self._recycle = recycle
+        self._invalidate_time = 0
         self._use_threadlocal = use_threadlocal
         if reset_on_return in ('rollback', True, reset_rollback):
             self._reset_on_return = reset_rollback
@@ -276,6 +277,22 @@ class Pool(log.Identified):
 
         return _ConnectionRecord(self)
 
+    def _invalidate(self, connection):
+        """Mark all connections established within the generation
+        of the given connection as invalidated.
+
+        If this pool's last invalidate time is before when the given
+        connection was created, update the timestamp til now.  Otherwise,
+        no action is performed.
+
+        Connections with a start time prior to this pool's invalidation
+        time will be recycled upon next checkout.
+        """
+        rec = getattr(connection, "_connection_record", None)
+        if not rec or self._invalidate_time < rec.starttime:
+            self._invalidate_time = time.time()
+
+
     def recreate(self):
         """Return a new :class:`.Pool`, of the same class as this one
         and configured with identical creation arguments.
@@ -301,17 +318,6 @@ class Pool(log.Identified):
 
         raise NotImplementedError()
 
-    def _replace(self):
-        """Dispose + recreate this pool.
-
-        Subclasses may employ special logic to
-        move threads waiting on this pool to the
-        new one.
-
-        """
-        self.dispose()
-        return self.recreate()
-
     def connect(self):
         """Return a DBAPI connection from the pool.
 
@@ -483,6 +489,7 @@ class _ConnectionRecord(object):
         self.connection = None
 
     def get_connection(self):
+        recycle = False
         if self.connection is None:
             self.connection = self.__connect()
             self.info.clear()
@@ -493,6 +500,15 @@ class _ConnectionRecord(object):
             self.__pool.logger.info(
                     "Connection %r exceeded timeout; recycling",
                     self.connection)
+            recycle = True
+        elif self.__pool._invalidate_time > self.starttime:
+            self.__pool.logger.info(
+                    "Connection %r invalidated due to pool invalidation; recycling",
+                    self.connection
+                    )
+            recycle = True
+
+        if recycle:
             self.__close()
             self.connection = self.__connect()
             self.info.clear()
@@ -911,8 +927,6 @@ class QueuePool(Pool):
         try:
             wait = use_overflow and self._overflow >= self._max_overflow
             return self._pool.get(wait, self._timeout)
-        except sqla_queue.SAAbort as aborted:
-            return aborted.context._do_get()
         except sqla_queue.Empty:
             if use_overflow and self._overflow >= self._max_overflow:
                 if not wait:
@@ -974,12 +988,6 @@ class QueuePool(Pool):
         self._overflow = 0 - self.size()
         self.logger.info("Pool disposed. %s", self.status())
 
-    def _replace(self):
-        self.dispose()
-        np = self.recreate()
-        self._pool.abort(np)
-        return np
-
     def status(self):
         return "Pool size: %d  Connections in pool: %d "\
                 "Current Overflow: %d Current Checked out "\
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index fc6f3dc..cde19b3 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -1069,7 +1069,8 @@ class QueuePoolTest(PoolTestBase):
                 # inside the queue, before we invalidate the other
                 # two conns
                 time.sleep(.2)
-                p2 = p._replace()
+                p._invalidate(c2)
+                c2.invalidate()
 
                 for t in threads:
                     t.join(join_timeout)
@@ -1079,19 +1080,18 @@ class QueuePoolTest(PoolTestBase):
     @testing.requires.threading_with_mock
     def test_notify_waiters(self):
         dbapi = MockDBAPI()
+
         canary = []
-        def creator1():
+        def creator():
             canary.append(1)
             return dbapi.connect()
-        def creator2():
-            canary.append(2)
-            return dbapi.connect()
-        p1 = pool.QueuePool(creator=creator1,
+        p1 = pool.QueuePool(creator=creator,
                            pool_size=1, timeout=None,
                            max_overflow=0)
-        p2 = pool.NullPool(creator=creator2)
+        #p2 = pool.NullPool(creator=creator2)
         def waiter(p):
             conn = p.connect()
+            canary.append(2)
             time.sleep(.5)
             conn.close()
 
@@ -1104,12 +1104,14 @@ class QueuePoolTest(PoolTestBase):
             threads.append(t)
         time.sleep(.5)
         eq_(canary, [1])
-        p1._pool.abort(p2)
+
+        c1.invalidate()
+        p1._invalidate(c1)
 
         for t in threads:
             t.join(join_timeout)
 
-        eq_(canary, [1, 2, 2, 2, 2, 2])
+        eq_(canary, [1, 1, 2, 2, 2, 2, 2])
 
     def test_dispose_closes_pooled(self):
         dbapi = MockDBAPI()
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index ba336a1..a3ad9c5 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -146,16 +146,20 @@ class MockReconnectTest(fixtures.TestBase):
         # close shouldnt break
 
         conn.close()
-        is_not_(self.db.pool, db_pool)
-
-        # ensure all connections closed (pool was recycled)
 
+        # ensure one connection closed...
         eq_(
             [c.close.mock_calls for c in self.dbapi.connections],
-            [[call()], [call()]]
+            [[call()], []]
         )
 
         conn = self.db.connect()
+
+        eq_(
+            [c.close.mock_calls for c in self.dbapi.connections],
+            [[call()], [call()], []]
+        )
+
         conn.execute(select([1]))
         conn.close()
 
@@ -534,8 +538,6 @@ class RealReconnectTest(fixtures.TestBase):
         # invalidate() also doesn't screw up
         assert_raises(exc.DBAPIError, engine.connect)
 
-        # pool was recreated
-        assert engine.pool is not p1
 
     def test_null_pool(self):
         engine = \

@sqlalchemy-bot
Copy link
Collaborator Author

Michael Bayer (@zzzeek) wrote:

test case. In this test, the current method calls engine.dispose() about seven times and seven pools are created. using the new system, only one pool is created.

from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine, exc
import mock
import time
import threading
import random

def slow_close():
    slow_closing_connection._slow_close()
    time.sleep(.5)

slow_closing_connection = mock.Mock()
slow_closing_connection.connect.return_value = mock.Mock()
slow_closing_connection.connect.return_value.close = slow_close

class Error(Exception):
    pass

dialect = mock.Mock()
dialect.is_disconnect = lambda *arg, **kw: True
dialect.dbapi.Error = Error

pools = []
class TrackQueuePool(QueuePool):
    def __init__(self, *arg, **kw):
        pools.append(self)
        super(TrackQueuePool, self).__init__(*arg, **kw)

def creator():
    return slow_closing_connection.connect()
p1 = TrackQueuePool(creator=creator, pool_size=20)

eng = create_engine("postgresql://", pool=p1, _initialize=False)
eng.dialect = dialect

# 15 total connections
conns = [eng.connect() for i in range(15)]


# return 8 back to the pool
for conn in conns[3:10]:
    conn.close()

def attempt(conn):
    time.sleep(random.random())
    try:
        conn._handle_dbapi_exception(Error(), "statement", {}, mock.Mock(), mock.Mock())
    except exc.DBAPIError:
        pass

# run an error + invalidate operation on the remaining 7 open connections
threads = []
for conn in conns:
    t = threading.Thread(target=attempt, args=(conn, ))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

# return all 15 connections to the pool
for conn in conns:
    conn.close()

# re-open 15 total connections
conns = [eng.connect() for i in range(15)]

# 15 connections have been fully closed due to invalidate
assert slow_closing_connection._slow_close.call_count == 15

# 15 initial connections + 15 reconnections
assert slow_closing_connection.connect.call_count == 30
assert len(pools) <= 2, len(pools)



@sqlalchemy-bot
Copy link
Collaborator Author

Michael Bayer (@zzzeek) wrote:

  • A major improvement made to the mechanics by which the :class:.Engine
    recycles the connection pool when a "disconnect" condition is detected;
    instead of discarding the pool and explicitly closing out connections,
    the pool is retained and a "generational" timestamp is updated to
    reflect the current time, thereby causing all existing connections
    to be recycled when they are next checked out. This greatly simplifies
    the recycle process, removes the need for "waking up" connect attempts
    waiting on the old pool and eliminates the race condition that many
    immediately-discarded "pool" objects could be created during the
    recycle operation. fixes simplify pool recycle logic #2985

eed9cfc

@sqlalchemy-bot
Copy link
Collaborator Author

Changes by Michael Bayer (@zzzeek):

  • changed status to closed

@sqlalchemy-bot
Copy link
Collaborator Author

Michael Bayer (@zzzeek) wrote:

  • restore the old behavior of the connection pool replacing itself just
    within userland engine.dispose(); as some SQLA tests already failed when the replace step
    was removed, due to those conns still being referenced, it's likely this will
    create surprises for all those users that incorrectly use dispose()
    and it's not really worth dealing with. This doesn't affect the change
    we made for ref: simplify pool recycle logic #2985.

8e10ab9

@sqlalchemy-bot
Copy link
Collaborator Author

Michael Bayer (@zzzeek) wrote:

  • Fixed some "double invalidate" situations were detected where
    a connection invalidation could occur within an already critical section
    like a connection.close(); ultimately, these conditions are caused
    by the change in 🎫2907, in that the "reset on return" feature
    calls out to the Connection/Transaction in order to handle it, where
    "disconnect detection" might be caught. However, it's possible that
    the more recent change in 🎫2985 made it more likely for this
    to be seen as the "connection invalidate" operation is much quicker,
    as the issue is more reproducible on 0.9.4 than 0.9.3.

Checks are now added within any section that
an invalidate might occur to halt further disallowed operations
on the invalidated connection. This includes two fixes both at the
engine level and at the pool level. While the issue was observed
with highly concurrent gevent cases, it could in theory occur in
any kind of scenario where a disconnect occurs within the connection
close operation.
fixes #3043
ref #2985
ref #2907

  • add some defensive checks during an invalidate situation:
  1. _ConnectionRecord.invalidate might be called twice within finalize_fairy
    if the _reset() raises an invalidate condition, invalidates, raises and then
    goes to invalidate the CR. so check for this.
  2. similarly within Conneciton, anytime we do handle_dbapi_error(), we might become invalidated.
    so a following finally must check self.__invalid before dealing with the connection
    any futher.

85d1899

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working high priority
Projects
None yet
Development

No branches or pull requests

1 participant