2017-04-10 7 views
1

1 천만 개 이상의 분광 데이터 세트를 처리해야합니다. 데이터는 다음과 같이 구성되어 있습니다. 약 1000 개의 .fits (.fits는 일부 데이터 저장 형식) 파일이 있으며 각 파일에는 각 스펙트럼에 약 4500 개의 요소가있는 600-1000 개의 스펙트럼이 들어 있습니다 (각 파일은 ~ 1000을 반환합니다). * 4500 매트릭스). 즉, 1000 만 개 이상의 항목을 반복 할 경우 각 스펙트럼을 약 10 회 반복하여 읽게 될 것입니다 (또는 각 파일은 약 10,000 회 반복하여 읽게 될 것입니다). 동일한 스펙트럼이 약 10 회 반복해서 읽히지 만 동일한 스펙트럼의 다른 세그먼트를 추출 할 때마다 중복되지는 않습니다. @Paul Panzer의 도움으로 이미 같은 파일을 여러 번 읽는 것을 피합니다.HPC에서 과도한 입/출력을 처리하기위한 다중 처리 구현

나는 x, y가, 반경 r, 강도 s 등 카탈로그는 내가 읽을려고하는 파일을 대상으로 정보를 포함 좌표처럼 내가 필요로하는 모든 정보를 (포함하는 카탈로그 파일이 n1, n2으로 식별) 및 해당 파일의 어떤 스펙트럼 (n3으로 식별 됨)을 사용하는지 확인하십시오.

내가 지금 가지고있는 코드는 다음과 같습니다

import numpy as np 
from itertools import izip 
import itertools 
import fitsio 

x = [] 
y = [] 
r = [] 
s = [] 
n1 = [] 
n2 = [] 
n3 = [] 
with open('spectra_ID.dat') as file_ID, open('catalog.txt') as file_c: 
    for line1, line2 in izip(file_ID,file_c): 
     parts1 = line1.split() 
     parts2 = line2.split() 
     n1.append(int(parts1[0])) 
     n2.append(int(parts1[1])) 
     n3.append(int(parts1[2])) 
     x.append(float(parts2[0]))   
     y.append(float(parts2[1]))   
     r.append(float(parts2[2])) 
     s.append(float(parts2[3])) 

def data_analysis(n_galaxies): 
    n_num = 0 
    data = np.zeros((n_galaxies), dtype=[('spec','f4',(200)),('x','f8'),('y','f8'),('r','f8'),('s','f8')]) 

    idx = np.lexsort((n3,n2,n1)) 
    for kk,gg in itertools.groupby(zip(idx, n1[idx], n2[idx]), lambda x: x[1:]): 
     filename = "../../data/" + str(kk[0]) + "/spPlate-" + str(kk[0]) + "-" + str(kk[1]) + ".fits" 
     fits_spectra = fitsio.FITS(filename) 
     fluxx = fits_spectra[0].read() 
     n_element = fluxx.shape[1] 
     hdu = fits_spectra[0].read_header() 
     wave_start = hdu['CRVAL1'] 
     logwave = wave_start + 0.0001 * np.arange(n_element) 
     wavegrid = np.power(10,logwave) 

     for ss, plate1, mjd1 in gg: 
      if n_num % 1000000 == 0: 
       print n_num 
      n3new = n3[ss]-1 
      flux = fluxx[n3new] 
      ### following is my data reduction of individual spectra, I will skip here 
      ### After all my analysis, I have the data storage as below: 
      data['spec'][n_num] = flux_intplt 
      data['x'][n_num] = x[ss] 
      data['y'][n_num] = y[ss] 
      data['r'][n_num] = r[ss] 
      data['s'][n_num] = s[ss] 

      n_num += 1 

    print n_num 
    data_output = FITS('./analyzedDATA/data_ALL.fits','rw') 
    data_output.write(data) 

내가 종류의 멀티 하나 개의 루프를 제거 할 수 있지만 함수에 인덱스를 통과 할 필요가 있음을 이해합니다. 그러나, 내 기능에는 두 개의 루프가 있으며 그 두 개는 서로 상관 관계가 높으므로 접근 방법을 모릅니다. 이 코드에서 가장 시간이 많이 소요되는 부분은 디스크에서 파일을 읽는 것이므로 멀티 프로세싱은 코어를 최대한 활용하여 한 번에 여러 파일을 읽어야합니다. 어느 누구도 내게 불을 붙일 수 있습니까?

답변

0
  1. global vars 제거하십시오, 당신은 하나 개의 데이터로 이 같은 스펙트럼의 다른 세그먼트를 할당 global vars 하나에 컨테이너 클래스 또는 DICT하여 여러 병합 processes
  2. global vars를 사용
  3. 을 설정할 수 없습니다 이동를 A 자신의 def ...
  4. def ...
  5. 별도 data_outputglobal with open(...
  6. multiprocessing없이,이 개념을 첫번째보십시오 :

    for line1, line2 in izip(file_ID,file_c): 
        data_set = create data set from (line1, line2) 
        result = data_analysis(data_set) 
        data_output.write(data) 
    
  7. 파일을 작성하기위한 하나를 읽고 파일이 processes 하나를 사용하는 것이 좋습니다. data_analysismultiprocessing.Pool(processes=n)을 사용하십시오.
    multiprocessing.Manager().Queue()

+0

당신이 비록 코드를 적어겠습니까 사용 processes 사이에 통신? 나는 그것을 분명히 이해할 수 없었다. –

+0

포인트 2로 시작하십시오. 3. 질문에 따라 코드를 편집하십시오. – stovfl