0
출력이 합계와 최대 날짜를 필요로하는 map/reduced가 필요한 파일이 있습니다. 그러나 합계 부분이 작동하지만, 감소 된 출력의 일부로 최대 날짜를 포함시키는 방법을 모르겠습니다.어떻게 합계와 최대 날짜로 매핑/축소 하시겠습니까?
입력 데이터는 다음과 같은 : 그들은 그룹화하고 있도록
ID1, ID2, date, count
3000, 001, 2014-12-30 18:00:00, 2
3000, 001, 2015-01-01 10:00:00, 1
3000, 002, 2014-11-18 12:53:00, 5
3000, 002, 2014-12-20 20:14:00, 3
내 매퍼 ID1 + ID2를 연결합니다. 출력은 다음과 같습니다
key (ID1|ID2), value (count)
3000|001, 2
3000|001, 1
3000|002, 5
3000|002, 3
감속기 출력은 다음과 같습니다 맵퍼 및 감속기이 기록
key (ID1|ID2), value (sum), date (max)
3000|001, 3, 2015-01-01 10:00:00
3000|002, 8, 2014-12-20 20:14:00
:
정말 필요한 것은key (ID1|ID2), value (sum)
3000|001, 3
3000|002, 8
는이 같은 출력 그러나 Ruby에서는 Python으로 작성된 예제를 사용합니다 (Ruby로 변환 해 보겠습니다).
require 'csv'
pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')
Dir.glob(pattern).each do |file|
CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
puts [
"#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
row[7] # value = count
].join("\t")
end
end
그리고 감속기 : 여기
는 매퍼 코드의prev_key = nil
key_total = 0
ARGF.each do |line|
line = line.chomp
next unless line
(key, value) = line.split("\t")
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
puts [prev_key, key_total].join("\t")
# reset key total for new key
prev_key = key
key_total = 0
elsif !prev_key
prev_key = key
end
# add to count for this current key
key_total += value.to_i
end
# this is to catch the final counts after all records have been received
puts [prev_key, key_total].join("\t")
UPDATE
여기 허용 대답에서 제안에 따라 새로운 맵퍼 및 감속기 :
매퍼 :
require 'csv'
pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')
Dir.glob(pattern).each do |file|
CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
date_time = "#{row[0]} #{row[1]}:00:00#{row[2]}" # %Y-%m-%d %H:%M:%S%z
puts [
"#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
"#{row[7]}|#{date_time}", # value = count | date_time
].join("\t")
end
end
감속기 :
require 'date'
prev_key = nil
key_total = 0
dates = []
ARGF.each do |line|
line = line.chomp
next unless line
(key, values) = line.split("\t")
(value, date_time) = values.split('|')
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
puts [prev_key.split('|'), key_total, dates.max].join("\t")
# reset key total for new key
prev_key = key
key_total = 0
# reset dates array for new key
dates.clear
elsif !prev_key
prev_key = key
end
# add date to array for this current key
dates << DateTime.strptime(date_time, '%Y-%m-%d %H:%M:%S%z')
# add to count for this current key
key_total += value.to_i
end
# this is to catch the final counts after all records have been received
puts [prev_key.split('|'), key_total, dates.max].join("\t")