2015-01-21 3 views
0

출력이 합계와 최대 날짜를 필요로하는 map/reduced가 필요한 파일이 있습니다. 그러나 합계 부분이 작동하지만, 감소 된 출력의 일부로 최대 날짜를 포함시키는 방법을 모르겠습니다.어떻게 합계와 최대 날짜로 매핑/축소 하시겠습니까?

입력 데이터는 다음과 같은 : 그들은 그룹화하고 있도록

ID1, ID2, date,    count 
3000, 001, 2014-12-30 18:00:00, 2 
3000, 001, 2015-01-01 10:00:00, 1 
3000, 002, 2014-11-18 12:53:00, 5 
3000, 002, 2014-12-20 20:14:00, 3 

내 매퍼 ID1 + ID2를 연결합니다. 출력은 다음과 같습니다

key (ID1|ID2), value (count) 
3000|001,  2 
3000|001,  1 
3000|002,  5 
3000|002,  3 

감속기 출력은 다음과 같습니다 맵퍼 및 감속기이 기록

key (ID1|ID2), value (sum), date (max) 
3000|001,  3,   2015-01-01 10:00:00 
3000|002,  8,   2014-12-20 20:14:00 

:

정말 필요한 것은
key (ID1|ID2), value (sum) 
3000|001,  3 
3000|002,  8 

는이 같은 출력 그러나 Ruby에서는 Python으로 작성된 예제를 사용합니다 (Ruby로 변환 해 보겠습니다).

require 'csv' 

pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt') 

Dir.glob(pattern).each do |file| 
    CSV.foreach(file, {col_sep: "\t", headers: false}) do |row| 
    puts [ 
      "#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2 
      row[7] # value = count 
     ].join("\t") 
    end 
end 

그리고 감속기 : 여기

는 매퍼 코드의

prev_key = nil 
key_total = 0 

ARGF.each do |line| 
    line = line.chomp 
    next unless line 

    (key, value) = line.split("\t") 

    # check for new key 
    if prev_key && key != prev_key && key_total > 0 

    # output total for previous key 
    puts [prev_key, key_total].join("\t") 

    # reset key total for new key 
    prev_key = key 
    key_total = 0 

    elsif !prev_key 
    prev_key = key 

    end 

    # add to count for this current key 
    key_total += value.to_i 

end 

# this is to catch the final counts after all records have been received 
puts [prev_key, key_total].join("\t") 

UPDATE

여기 허용 대답에서 제안에 따라 새로운 맵퍼 및 감속기 :

매퍼 :

require 'csv' 

pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt') 

Dir.glob(pattern).each do |file| 
    CSV.foreach(file, {col_sep: "\t", headers: false}) do |row| 
    date_time = "#{row[0]} #{row[1]}:00:00#{row[2]}" # %Y-%m-%d %H:%M:%S%z 
    puts [ 
      "#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2 
      "#{row[7]}|#{date_time}", # value = count | date_time 
     ].join("\t") 
    end 
end 

감속기 :

require 'date' 

prev_key = nil 
key_total = 0 
dates = [] 

ARGF.each do |line| 
    line = line.chomp 
    next unless line 

    (key, values) = line.split("\t") 
    (value, date_time) = values.split('|') 

    # check for new key 
    if prev_key && key != prev_key && key_total > 0 

    # output total for previous key 
    puts [prev_key.split('|'), key_total, dates.max].join("\t") 

    # reset key total for new key 
    prev_key = key 
    key_total = 0 

    # reset dates array for new key 
    dates.clear 

    elsif !prev_key 
    prev_key = key 

    end 

    # add date to array for this current key 
    dates << DateTime.strptime(date_time, '%Y-%m-%d %H:%M:%S%z') 

    # add to count for this current key 
    key_total += value.to_i 

end 

# this is to catch the final counts after all records have been received 
puts [prev_key.split('|'), key_total, dates.max].join("\t") 

답변

0

당신이 필요로 날짜를 넣어 계산 한 쌍의 < 날짜에, >을 계산하고 매퍼에서 값으로 방출하는 것입니다. 그런 다음 감속기에서 날짜를 추출하고 쌍 값에서 계산합니다. 합계는 이미했던 것처럼 계산하고 입력 값 (키당)에서 최대 날짜를 추적합니다.