0

다중 처리/멀티 스레딩을 구현하고자하는 작은 웹 스크레이퍼를 코딩하고 있습니다.Q : 멀티 스레딩/멀티 프로세싱을 사용하여 .CSV 파일에 함수 출력을 작성하는 방법은 무엇입니까? (문자열 배열을 입력으로 사용)

웹 사이트 URL이 입력 된 문자열을 받고, 일부 도메인 데이터를 스크랩하고, 각 도메인에 대해 줄 단위로 CSV 파일에 해당 데이터를 쓰는 내 함수 webScraper()를 작성했습니다.

모든 URL이 포함 된 입력 데이터는 urls = ["google.com", "yahoo.com", "bing.com"]과 같은 문자열 배열에 저장됩니다. CSV 파일에서 URL 가져 오기로 변경하는 것이 좋습니다.

불일치 및 색인 외부 오류가 발생하지 않고 어떻게 다중 출력을 사용하고 함수 출력을 CSV 파일에 쓸 수 있습니까? 멋진 스크립트를 찾았는데 정확히 필요한 스크립트 인 것 같습니다. 불행히도 며칠 전 Java에서 Python으로 전환 했으므로 정확하게 변경해야 할 항목을 파악할 수 없습니다.

기본적으로 아래 스크립트는 내 문자열 배열 urls 또는 입력 CSV 파일에있는 각 URL에 대해 내 함수 webScraper(url)을 호출하도록 변경하려고합니다. 그런 다음 스크립트는 각 배열 항목에 대한 함수 출력을 내 CSV에 줄 단위로 작성해야합니다 (코드를 올바르게 이해 한 경우). 나는 멀티에 관련된 CSV 파일에 어떤을 writting 없었다 경우 정말 나에게 문제가되지 않을 것이다 (Thanks to hbar for the nice code!)

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 
# multiproc_sums.py 
"""A program that reads integer values from a CSV file and writes out their 
sums to another CSV file, using multiple processes if desired. 
""" 

import csv 
import multiprocessing 
import optparse 
import sys 

NUM_PROCS = multiprocessing.cpu_count() 

def make_cli_parser(): 
    """Make the command line interface parser.""" 
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", 
      __doc__, 
      """ 
ARGUMENTS: 
    INPUT_CSV: an input CSV file with rows of numbers 
    OUTPUT_CSV: an output file that will contain the sums\ 
"""]) 
    cli_parser = optparse.OptionParser(usage) 
    cli_parser.add_option('-n', '--numprocs', type='int', 
      default=NUM_PROCS, 
      help="Number of processes to launch [DEFAULT: %default]") 
    return cli_parser 

class CSVWorker(object): 
    def __init__(self, numprocs, infile, outfile): 
     self.numprocs = numprocs 
     self.infile = open(infile) 
     self.outfile = outfile 
     self.in_csvfile = csv.reader(self.infile) 
     self.inq = multiprocessing.Queue() 
     self.outq = multiprocessing.Queue() 

     self.pin = multiprocessing.Process(target=self.parse_input_csv, args=()) 
     self.pout = multiprocessing.Process(target=self.write_output_csv, args=()) 
     self.ps = [ multiprocessing.Process(target=self.sum_row, args=()) 
         for i in range(self.numprocs)] 

     self.pin.start() 
     self.pout.start() 
     for p in self.ps: 
      p.start() 

     self.pin.join() 
     i = 0 
     for p in self.ps: 
      p.join() 
      print "Done", i 
      i += 1 

     self.pout.join() 
     self.infile.close() 

    def parse_input_csv(self): 
      """Parses the input CSV and yields tuples with the index of the row 
      as the first element, and the integers of the row as the second 
      element. 

      The index is zero-index based. 

      The data is then sent over inqueue for the workers to do their 
      thing. At the end the input process sends a 'STOP' message for each 
      worker. 
      """ 
      for i, row in enumerate(self.in_csvfile): 
       row = [ int(entry) for entry in row ] 
       self.inq.put((i, row)) 

      for i in range(self.numprocs): 
       self.inq.put("STOP") 

    def sum_row(self): 
     """ 
     Workers. Consume inq and produce answers on outq 
     """ 
     tot = 0 
     for i, row in iter(self.inq.get, "STOP"): 
       self.outq.put((i, sum(row))) 
     self.outq.put("STOP") 

    def write_output_csv(self): 
     """ 
     Open outgoing csv file then start reading outq for answers 
     Since I chose to make sure output was synchronized to the input there 
     is some extra goodies to do that. 

     Obviously your input has the original row number so this is not 
     required. 
     """ 
     cur = 0 
     stop = 0 
     buffer = {} 
     # For some reason csv.writer works badly across processes so open/close 
     # and use it all in the same process or else you'll have the last 
     # several rows missing 
     outfile = open(self.outfile, "w") 
     self.out_csvfile = csv.writer(outfile) 

     #Keep running until we see numprocs STOP messages 
     for works in range(self.numprocs): 
      for i, val in iter(self.outq.get, "STOP"): 
       # verify rows are in order, if not save in buffer 
       if i != cur: 
        buffer[i] = val 
       else: 
        #if yes are write it out and make sure no waiting rows exist 
        self.out_csvfile.writerow([i, val]) 
        cur += 1 
        while cur in buffer: 
         self.out_csvfile.writerow([ cur, buffer[cur] ]) 
         del buffer[cur] 
         cur += 1 

     outfile.close() 

def main(argv): 
    cli_parser = make_cli_parser() 
    opts, args = cli_parser.parse_args(argv) 
    if len(args) != 2: 
     cli_parser.error("Please provide an input file and output file.") 

    c = CSVWorker(opts.numprocs, args[0], args[1]) 

if __name__ == '__main__': 
    main(sys.argv[1:]) 

모든 것은 일하고 코드의

. 나는 이미 다른 해결책으로 파이썬 맵 풀 (link)을 시도했지만 성공하지는 못했습니다. 나는 풀장들 사이에 오류를 가져온 불일치가 있다고 생각합니다.

의견을 보내 주셔서 감사합니다.

답변

0

내가 처리 할 방법은 다중 처리를 사용하여 웹 스크래핑을 수행 한 다음 단일 프로세스를 사용하여 CSV에 기록하는 것입니다. 나는 스크래핑이 시간 소모적 인 부분이며 I/O가 빠르다는 것을 내기를 기꺼이합니다. 아래는 Pool.map을 사용하여 함수를 다중 처리하는 코드 스 니펫입니다.

import multiprocessing as mp 
import csv 

pool = mp.Pool(processes=mp.cpu_count()) 
# or however many processors you can support 

scraped_data = pool.map(webScraper, urls) 

with open('out.csv') as outfile: 
    wr = csv.writer(outfile) 
    wr.writerow(scraped_data)