0
데이터베이스 테이블이 있습니다.이 인스턴스에서 ~ 360k 행의 행을 읽고 나중에이 사용 스크립트를 사용하여 쓰기 위해 pyodbc.row 개체를 목록에 넣습니다. 다음과 같은 출력 내가 목록을 분할하는 방법에 대한/체크 불분명 오전 추측파이썬 다중 프로세스 풀 작동을위한 도움이 필요합니다.
c:\Python27>.\python27.exe .\multitest.py
Starting multiprocessing pool..
We Read 361554 rows in 0:00:14.064000
<type 'list'>
Processing 361554 rows
Splitting into 8 chunks with 45194.0 rows each
POOL: Received 8 rows to work on
을 제공
from hashlib import sha1
import multiprocessing
import datetime, os
import pyodbc
import math
import traceback, sys
source_rows = []
processors = 8
def split(a, n):
k, m = len(a)/n, len(a) % n
return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))
def sqn(*args):
return sha1('}#EUCLID#{'.join([str(arg.upper().strip()) for arg in args]).encode()).hexdigest().upper()
def sDB_read():
t1 = datetime.datetime.now()
#Initialize Source Database
src_driver='{SQL Server Native Client 11.0}'
src_server='SCCMSV2SS010'
src_database='STAGE'
src_trusted_conn='yes'
src_uid = 'myUserID'
src_pwd = 'myPwd'
if src_trusted_conn == 'yes':
src_conn_str = """Driver=""" + src_driver + """;Server=""" + src_server + """;Database=""" + src_database + """;Trusted_Connection=""" + src_trusted_conn + """;PacketSize=32767;"""
else:
src_conn_str = """Driver=""" + src_driver + """;Server=""" + src_server + """;Database=""" + src_database + """;UID=""" + src_UID + """;PWD=""" + src_pwd + """;PacketSize=32767;"""
sql = 'SELECT [AgentID] ,[BootDevice00] ,[BuildNumber00] ,[BuildType00] ,[Caption00] ,[CodeSet00] ,[CountryCode00] ,[CSDVersion00] ,[CurrentTimeZone00] ,[Debug00] ,[Description00] ,[Distributed00] ,[ForegroundApplicationBoost00] ,[FreePhysicalMemory00] ,[FreeSpaceInPagingFiles00] ,[FreeVirtualMemory00] ,[InstallDate00] ,[InstanceKey] ,[LastBootUpTime00] ,[LocalDateTime00] ,[Locale00] ,[MachineID] ,[Manufacturer00] ,[MaxNumberOfProcesses00] ,[MaxProcessMemorySize00] ,[Name00] ,[NumberOfLicensedUsers00] ,[NumberOfProcesses00] ,[NumberOfUsers00] ,[OperatingSystemSKU00] ,[Organization00] ,[OSArchitecture00] ,[OSLanguage00] ,[OSProductSuite00] ,[OSType00] ,[OtherTypeDescription00] ,[PlusProductID00] ,[PlusVersionNumber00] ,[Primary00] ,[ProductType00] ,[RegisteredUser00] ,[RevisionID] ,[rowversion] ,[SerialNumber00] ,[ServicePackMajorVersion00] ,[ServicePackMinorVersion00] ,[SizeStoredInPagingFiles00] ,[Status00] ,[SystemDevice00] ,[SystemDirectory00] ,[TimeKey] ,[TotalSwapSpaceSize00] ,[TotalVirtualMemorySize00] ,[TotalVisibleMemorySize00] ,[Version00] ,[WindowsDirectory00] FROM [STAGE].[dbo].[Operating_System_DATA]'
src_db_conn = pyodbc.connect(src_conn_str)
src_db_conn.autocommit = False
src_db_cursor = src_db_conn.cursor()
src_db_cursor.execute(sql)
source_rows = [ {c[0]: v for (c, v) in zip(row.cursor_description, row)} for row in src_db_cursor.fetchall() ]
t2 = datetime.datetime.now()
print('\nWe Read ' + str(len(source_rows)) + ' rows in ' + str(t2 - t1))
return source_rows
def tDB_write():
print('\nPOOL: Received ' + str(len(source_rows)) + ' rows to work on')
t1 = datetime.datetime.now()
#Initialize Target Database
targ_driver='{SQL Server Native Client 11.0}'
targ_server='SCCMSV2SS010'
targ_database='STAGE'
targ_trusted_conn='yes'
targ_uid = 'myUserID'
targ_pwd = 'myPwd'
if targ_trusted_conn == 'yes':
targ_conn_str = """Driver=""" + targ_driver + """;Server=""" + targ_server + """;Database=""" + targ_database + """;Trusted_Connection=""" + targ_trusted_conn + """;PacketSize=32767;"""
else:
targ_conn_str = """Driver=""" + targ_driver + """;Server=""" + targ_server + """;Database=""" + targ_database + """;UID=""" + targ_UID + """;PWD=""" + targ_pwd + """;PacketSize=32767;"""
targ_db_conn = pyodbc.connect(targ_conn_str)
targ_db_conn.autocommit = False
targ_db_cursor = targ_db_conn.cursor()
table = 'Operating_System_DATA_INSERT'
for sourceitems in source_rows:
for source_row in sourceitems:
try:
sql = ''
#print(str(source_row))
columns = ', '.join(source_row.keys())
placeholders = ', '.join(['?'] * len(source_row))
obj = source_row.values()
sql = "targ_db_cursor.execute('INSERT into {} ({}) VALUES ({})', {})".format(table, columns, placeholders , obj)
#print(sql)
res = eval(sql)
except Exception,e:
traceback.print_exc(file=sys.stdout)
targ_db_conn.commit()
t2 = datetime.datetime.now()
print('\nWe Wrote ' + str(len(source_rows)) + ' rows in ' + str(t2 - t1))
return
if __name__ == '__main__':
print('\nStarting multiprocessing pool..')
pool = multiprocessing.Pool(processes = processors)
source_rows = sDB_read()
print(type(source_rows))
targetsize = len(source_rows)
print('\nProcessing ' + str(targetsize) + ' rows')
chunksize = math.ceil(len(source_rows)/processors)
print('Splitting into ' + str(processors) + " chunks with " + str(chunksize) + ' rows each')
source_rows = list(split(source_rows , processors))
write_pool_outputs = pool.map(tDB_write() , source_rows)
print('\nClosing multiprocessing pool..')
pool.close()
pool.join()
은 각각의 근로자가 작업 할 행의 동일한 점유율을 가져옵니다. 아무리 목록을 수동으로 분할하려고하거나 변경되지 않은 풀을 pool.map에 직접 전달하면 모든 행을 가져 오는 작업자가 한 명만 시작됩니다. 어떤 사람이 이것을 성취하기 위해 적절한 방법으로 나를 교육시킬 수 있습니까?
나는 그 질문을 오해했다. +1 then – That1Guy
ty 선생님 정확히 내가 필요한 찔러. –