2015-02-06 3 views
0

데이터베이스 테이블이 있습니다.이 인스턴스에서 ~ 360k 행의 행을 읽고 나중에이 사용 스크립트를 사용하여 쓰기 위해 pyodbc.row 개체를 목록에 넣습니다. 다음과 같은 출력 내가 목록을 분할하는 방법에 대한/체크 불분명 오전 추측파이썬 다중 프로세스 풀 작동을위한 도움이 필요합니다.

c:\Python27>.\python27.exe .\multitest.py 
Starting multiprocessing pool.. 
We Read 361554 rows in 0:00:14.064000 
<type 'list'> 
Processing 361554 rows 
Splitting into 8 chunks with 45194.0 rows each 
POOL: Received 8 rows to work on 

을 제공

from hashlib import sha1 
import multiprocessing 
import datetime, os 
import pyodbc 
import math 
import traceback, sys 
source_rows = [] 
processors = 8 

def split(a, n): 
    k, m = len(a)/n, len(a) % n 
    return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n)) 

def sqn(*args): 
    return sha1('}#EUCLID#{'.join([str(arg.upper().strip()) for arg in args]).encode()).hexdigest().upper() 

def sDB_read(): 
    t1 = datetime.datetime.now() 
    #Initialize Source Database 
    src_driver='{SQL Server Native Client 11.0}' 
    src_server='SCCMSV2SS010' 
    src_database='STAGE' 
    src_trusted_conn='yes' 
    src_uid = 'myUserID' 
    src_pwd = 'myPwd' 
    if src_trusted_conn == 'yes': 
     src_conn_str = """Driver=""" + src_driver + """;Server=""" + src_server + """;Database=""" + src_database + """;Trusted_Connection=""" + src_trusted_conn + """;PacketSize=32767;""" 
    else: 
     src_conn_str = """Driver=""" + src_driver + """;Server=""" + src_server + """;Database=""" + src_database + """;UID=""" + src_UID + """;PWD=""" + src_pwd + """;PacketSize=32767;""" 
    sql = 'SELECT [AgentID]    ,[BootDevice00]    ,[BuildNumber00]    ,[BuildType00]    ,[Caption00]    ,[CodeSet00]    ,[CountryCode00]    ,[CSDVersion00]    ,[CurrentTimeZone00]    ,[Debug00]    ,[Description00]    ,[Distributed00]    ,[ForegroundApplicationBoost00]    ,[FreePhysicalMemory00]    ,[FreeSpaceInPagingFiles00]    ,[FreeVirtualMemory00]    ,[InstallDate00]    ,[InstanceKey]    ,[LastBootUpTime00]    ,[LocalDateTime00]    ,[Locale00]    ,[MachineID]    ,[Manufacturer00]    ,[MaxNumberOfProcesses00]    ,[MaxProcessMemorySize00]    ,[Name00]    ,[NumberOfLicensedUsers00]    ,[NumberOfProcesses00]    ,[NumberOfUsers00]    ,[OperatingSystemSKU00]    ,[Organization00]    ,[OSArchitecture00]    ,[OSLanguage00]    ,[OSProductSuite00]    ,[OSType00]    ,[OtherTypeDescription00]    ,[PlusProductID00]    ,[PlusVersionNumber00]    ,[Primary00]    ,[ProductType00]    ,[RegisteredUser00]    ,[RevisionID]    ,[rowversion]    ,[SerialNumber00]    ,[ServicePackMajorVersion00]    ,[ServicePackMinorVersion00]    ,[SizeStoredInPagingFiles00]    ,[Status00]    ,[SystemDevice00]    ,[SystemDirectory00]    ,[TimeKey]    ,[TotalSwapSpaceSize00]    ,[TotalVirtualMemorySize00]    ,[TotalVisibleMemorySize00]    ,[Version00]    ,[WindowsDirectory00]   FROM [STAGE].[dbo].[Operating_System_DATA]' 
    src_db_conn = pyodbc.connect(src_conn_str) 
    src_db_conn.autocommit = False 
    src_db_cursor = src_db_conn.cursor()  
    src_db_cursor.execute(sql)  
    source_rows = [ {c[0]: v for (c, v) in zip(row.cursor_description, row)} for row in src_db_cursor.fetchall() ] 
    t2 = datetime.datetime.now() 
    print('\nWe Read ' + str(len(source_rows)) + ' rows in ' + str(t2 - t1)) 
    return source_rows 

def tDB_write():  
    print('\nPOOL: Received ' + str(len(source_rows)) + ' rows to work on') 
    t1 = datetime.datetime.now() 
    #Initialize Target Database  
    targ_driver='{SQL Server Native Client 11.0}' 
    targ_server='SCCMSV2SS010' 
    targ_database='STAGE' 
    targ_trusted_conn='yes' 
    targ_uid = 'myUserID' 
    targ_pwd = 'myPwd' 
    if targ_trusted_conn == 'yes': 
     targ_conn_str = """Driver=""" + targ_driver + """;Server=""" + targ_server + """;Database=""" + targ_database + """;Trusted_Connection=""" + targ_trusted_conn + """;PacketSize=32767;""" 
    else: 
     targ_conn_str = """Driver=""" + targ_driver + """;Server=""" + targ_server + """;Database=""" + targ_database + """;UID=""" + targ_UID + """;PWD=""" + targ_pwd + """;PacketSize=32767;""" 
    targ_db_conn = pyodbc.connect(targ_conn_str) 
    targ_db_conn.autocommit = False 
    targ_db_cursor = targ_db_conn.cursor()   
    table = 'Operating_System_DATA_INSERT'  
    for sourceitems in source_rows:   
     for source_row in sourceitems: 
      try: 
       sql = ''       
       #print(str(source_row)) 
       columns = ', '.join(source_row.keys())    
       placeholders = ', '.join(['?'] * len(source_row)) 
       obj = source_row.values() 
       sql = "targ_db_cursor.execute('INSERT into {} ({}) VALUES ({})', {})".format(table, columns, placeholders , obj) 
       #print(sql) 
       res = eval(sql) 
      except Exception,e: 
       traceback.print_exc(file=sys.stdout)    
    targ_db_conn.commit() 
    t2 = datetime.datetime.now() 
    print('\nWe Wrote ' + str(len(source_rows)) + ' rows in ' + str(t2 - t1)) 
    return 

if __name__ == '__main__':  
    print('\nStarting multiprocessing pool..') 
    pool = multiprocessing.Pool(processes = processors) 
    source_rows = sDB_read() 
    print(type(source_rows)) 
    targetsize = len(source_rows) 
    print('\nProcessing ' + str(targetsize) + ' rows') 
    chunksize = math.ceil(len(source_rows)/processors)  
    print('Splitting into ' + str(processors) + " chunks with " + str(chunksize) + ' rows each') 
    source_rows = list(split(source_rows , processors)) 
    write_pool_outputs = pool.map(tDB_write() , source_rows) 
    print('\nClosing multiprocessing pool..') 
    pool.close() 
    pool.join() 

은 각각의 근로자가 작업 할 행의 동일한 점유율을 가져옵니다. 아무리 목록을 수동으로 분할하려고하거나 변경되지 않은 풀을 pool.map에 직접 전달하면 모든 행을 가져 오는 작업자가 한 명만 시작됩니다. 어떤 사람이 이것을 성취하기 위해 적절한 방법으로 나를 교육시킬 수 있습니까?

답변