다중 처리/멀티 스레딩을 구현하고자하는 작은 웹 스크레이퍼를 코딩하고 있습니다.Q : 멀티 스레딩/멀티 프로세싱을 사용하여 .CSV 파일에 함수 출력을 작성하는 방법은 무엇입니까? (문자열 배열을 입력으로 사용)
웹 사이트 URL이 입력 된 문자열을 받고, 일부 도메인 데이터를 스크랩하고, 각 도메인에 대해 줄 단위로 CSV 파일에 해당 데이터를 쓰는 내 함수 webScraper()를 작성했습니다.
모든 URL이 포함 된 입력 데이터는 urls = ["google.com", "yahoo.com", "bing.com"]
과 같은 문자열 배열에 저장됩니다. CSV 파일에서 URL 가져 오기로 변경하는 것이 좋습니다.
불일치 및 색인 외부 오류가 발생하지 않고 어떻게 다중 출력을 사용하고 함수 출력을 CSV 파일에 쓸 수 있습니까? 멋진 스크립트를 찾았는데 정확히 필요한 스크립트 인 것 같습니다. 불행히도 며칠 전 Java에서 Python으로 전환 했으므로 정확하게 변경해야 할 항목을 파악할 수 없습니다.
기본적으로 아래 스크립트는 내 문자열 배열 urls
또는 입력 CSV 파일에있는 각 URL에 대해 내 함수 webScraper(url)
을 호출하도록 변경하려고합니다. 그런 다음 스크립트는 각 배열 항목에 대한 함수 출력을 내 CSV에 줄 단위로 작성해야합니다 (코드를 올바르게 이해 한 경우). 나는 멀티에 관련된 CSV 파일에 어떤을 writting 없었다 경우 정말 나에게 문제가되지 않을 것이다 (Thanks to hbar for the nice code!)
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""
import csv
import multiprocessing
import optparse
import sys
NUM_PROCS = multiprocessing.cpu_count()
def make_cli_parser():
"""Make the command line interface parser."""
usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
__doc__,
"""
ARGUMENTS:
INPUT_CSV: an input CSV file with rows of numbers
OUTPUT_CSV: an output file that will contain the sums\
"""])
cli_parser = optparse.OptionParser(usage)
cli_parser.add_option('-n', '--numprocs', type='int',
default=NUM_PROCS,
help="Number of processes to launch [DEFAULT: %default]")
return cli_parser
class CSVWorker(object):
def __init__(self, numprocs, infile, outfile):
self.numprocs = numprocs
self.infile = open(infile)
self.outfile = outfile
self.in_csvfile = csv.reader(self.infile)
self.inq = multiprocessing.Queue()
self.outq = multiprocessing.Queue()
self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
for i in range(self.numprocs)]
self.pin.start()
self.pout.start()
for p in self.ps:
p.start()
self.pin.join()
i = 0
for p in self.ps:
p.join()
print "Done", i
i += 1
self.pout.join()
self.infile.close()
def parse_input_csv(self):
"""Parses the input CSV and yields tuples with the index of the row
as the first element, and the integers of the row as the second
element.
The index is zero-index based.
The data is then sent over inqueue for the workers to do their
thing. At the end the input process sends a 'STOP' message for each
worker.
"""
for i, row in enumerate(self.in_csvfile):
row = [ int(entry) for entry in row ]
self.inq.put((i, row))
for i in range(self.numprocs):
self.inq.put("STOP")
def sum_row(self):
"""
Workers. Consume inq and produce answers on outq
"""
tot = 0
for i, row in iter(self.inq.get, "STOP"):
self.outq.put((i, sum(row)))
self.outq.put("STOP")
def write_output_csv(self):
"""
Open outgoing csv file then start reading outq for answers
Since I chose to make sure output was synchronized to the input there
is some extra goodies to do that.
Obviously your input has the original row number so this is not
required.
"""
cur = 0
stop = 0
buffer = {}
# For some reason csv.writer works badly across processes so open/close
# and use it all in the same process or else you'll have the last
# several rows missing
outfile = open(self.outfile, "w")
self.out_csvfile = csv.writer(outfile)
#Keep running until we see numprocs STOP messages
for works in range(self.numprocs):
for i, val in iter(self.outq.get, "STOP"):
# verify rows are in order, if not save in buffer
if i != cur:
buffer[i] = val
else:
#if yes are write it out and make sure no waiting rows exist
self.out_csvfile.writerow([i, val])
cur += 1
while cur in buffer:
self.out_csvfile.writerow([ cur, buffer[cur] ])
del buffer[cur]
cur += 1
outfile.close()
def main(argv):
cli_parser = make_cli_parser()
opts, args = cli_parser.parse_args(argv)
if len(args) != 2:
cli_parser.error("Please provide an input file and output file.")
c = CSVWorker(opts.numprocs, args[0], args[1])
if __name__ == '__main__':
main(sys.argv[1:])
모든 것은 일하고 코드의
. 나는 이미 다른 해결책으로 파이썬 맵 풀 (link)을 시도했지만 성공하지는 못했습니다. 나는 풀장들 사이에 오류를 가져온 불일치가 있다고 생각합니다.
의견을 보내 주셔서 감사합니다.