2017-02-09 4 views
2

다음 오류가 발생합니다 (응용 프로그램에서 포크로 인해 발생한다고 가정 함). "이 결과 개체는 행을 반환하지 않습니다."포킹, sqlalchemy 및 범위가 지정된 세션

Traceback 
--------- 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 263, in execute_task 
result = _execute_task(task, data) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 245, in _execute_task 
return func(*args2) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/smg/analytics/services/impact_analysis.py", line 140, in _do_impact_analysis_mp 
Correlation.user_id.in_(user_ids)).all()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2241, in all 
return list(self) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 65, in instances 
fetch = cursor.fetchall() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 752, in fetchall 
self.cursor, self.context) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1027, in _handle_dbapi_exception 
util.reraise(*exc_info) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 746, in fetchall 
l = self.process_rows(self._fetchall_impl()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 715, in _fetchall_impl 
self._non_result() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 720, in _non_result 
"This result object does not return rows. " 

나는 DASK를 사용하고 있는데 (multiprocessing.Pool를 사용하는) 스케줄러 멀티 프로세싱입니다. (문서를 기반으로) 이해 했으므로 범위가 지정된 세션 객체 (scoped_session()을 통해 생성)에서 생성 된 세션은 스레드 세이프입니다. 이것은 스레드 로컬이기 때문입니다. 이것은 내가 Session() (또는 프록시 Session를 사용하여) 호출했을 때만 호출되는 스레드에서만 액세스 할 수있는 세션 객체를 얻게된다고 믿게합니다. 이것은 꽤 솔직하게 보입니다.

내가 혼란스러워하는 이유는 프로세스를 포크 할 때 왜 문제가 있는지입니다. (워드 프로세서 기준)

class _DB(object): 

    _engine = None 

    @classmethod 
    def _get_engine(cls, force_new=False): 
     if cls._engine is None or force_new is True: 
      cfg = Config.get_config() 
      user = cfg['USER'] 
      host = cfg['HOST'] 
      password = cfg['PASSWORD'] 
      database = cfg['DATABASE'] 
      engine = create_engine(
       'mysql://{}:{}@{}/{}?local_infile=1&' 
       'unix_socket=/var/run/mysqld/mysqld.sock'. 
        format(user, password, host, database), 
       pool_size=5, pool_recycle=3600) 
      cls._engine = engine 
     return cls._engine 



# From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "connect") 
def connect(dbapi_connection, connection_record): 
    connection_record.info['pid'] = os.getpid() 

#From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "checkout") 
def checkout(dbapi_connection, connection_record, connection_proxy): 
    pid = os.getpid() 
    if connection_record.info['pid'] != pid: 
     connection_record.connection = connection_proxy.connection = None 
     raise exc.DisconnectionError(
      "Connection record belongs to pid %s, " 
      "attempting to check out in pid %s" % 
      (connection_record.info['pid'], pid) 
     ) 


# The following is how I create the scoped session object. 

Session = scoped_session(sessionmaker(
    bind=_DB._get_engine(), autocommit=False, autoflush=False)) 

Base = declarative_base() 
Base.query = Session.query_property() 

그래서 내 가정 : 나는 당신이 이 과정에서 엔진을 다시 사용할 수 없다는 것을 이해하고, 그래서이 워드 프로세서에서 그대로시 기반 솔루션을 따라 수행 한 다음과 같습니다 :

  1. 이 범위 세션 개체에서 생성 된 세션 객체를 사용하여, 그것은 항상 나에게 (내 경우에는 바로 자식 프로세스의 메인 쓰레드 것) ThreadLocal을 세션을 제공해야합니다. 비록 문서에서는 아니지만 범위가 지정된 세션 객체가 다른 프로세스에서 작성된 경우에도 적용되어야한다고 생각합니다.

  2. 있어 ThreadLocal 세션 연결이 프로세스 내에서 생성되지 않은 경우, 그것은 새 것으로 만드는 것, 엔진을 통해 풀에서 연결을 얻을 것이다 (상기 connection()checkout() 구현에 기초한다.)

두 가지 모두 사실이라면 모든 것이 "잘 작동합니다"(AFAICT). 그래도 그런 경우는 아닙니다.

새로운 프로세스마다 새로운 범위 지정 세션 객체를 만들고 세션을 사용하는 모든 후속 호출에서 을 사용하여 작동하도록했습니다.

BTW 속성은이 새 범위 개체에서 업데이트해야합니다.

위의 1 번 가정이 잘못되었다고 생각합니다. 누구도 왜 각 프로세스에서 새로운 범위 개체를 만들어야하는지 이해할 수 있습니까?

건배.

+0

당신이 포크 (fork) 코드뿐만 아니라, 전체 스택 추적을 포함, 최소한의 예를 게시 할 수 있습니까? 연결 풀이 포크 이전에 DB에 연결되어 소켓을 공유하는 두 프로세스로 연결되는 것에 대한 의혹이 있습니다. – univerio

+0

몇 가지 예제 코드를 추가하겠습니다. 어떤 포크가 만들어지기 전에 풀이 이미 연결되었을 것입니다 만, 풀을 사용하는 자식 프로세스는 호출 코드의 pid를 사용하거나 새로운 코드를 생성함으로써 (위의'checkout' 메쏘드에 따라) 처리됩니다. 적어도 AIUI 의도입니다. –

+0

'dask.multiprocessing.get'을 사용하는 대신 단일 노드 분산 스케줄러를 생성 할 수 있습니다. 이렇게하면 더 깨끗한 프로세스에서 사전 포크로 처리되며 일반적으로보다 깨끗한 환경을 경험할 수 있습니다. http://dask.pydata.org/en/latest/scheduler-choice.html – MRocklin

답변

0

포크가 발생했을 때 명확하지 않지만 가장 일반적인 문제는 엔진이 fork_size = 5로 데이터베이스에 대한 TCP 연결을 초기화하는 포크 ​​이전에 만들어지고 새로운 프로세스로 복사되고 동일한 물리적 소켓과 상호 작용하는 여러 프로세스가 발생합니다 => 문제.

  • 풀을 사용하지 않도록 수요와 연결에 사용 : poolclass을 = NullPool
  • 다시 만듭니다 포크 후 풀 :에

    옵션은 sqla_engine합니다.dispose()

  • 지연 포크 후까지 create_engine
+0

엔진이 생성 된 후에 확실히 fork 될 것이지만, AIUI가 그 요점이다. 커스텀'checkout()'메소드의; 하나의 풀을 사용하여 여러 프로세스를 처리합니다. 나는 그것이 dask가 프로세스를 포크하는 방식과 관련이 있다고 생각합니다. –