2016-06-03 6 views
2

Kairosdb의 모든 메트릭 값에서 CSV 파일을 만들 필요가 있습니다.Java 8 - Kairosdb의 여러 객체 목록을 CSV 파일로 저장

kairosdb UI에는 이미 저장 기능이 있지만 내 보낸 파일에 메트릭 이름이 없습니다. 또한 여러 측정 항목을 단일 파일로 내보낼 수 없습니다.

내가 직면 한 문제는 여러 측정 항목의 타임 스탬프를 일치시키는 것입니다. 예를 들어, 하나의 메트릭은 5 개의 시간 소인 값을 리턴 할 수 있습니다. 다른 메트릭은 이전 메트릭과 일치 할 수있는 10 개의 타임 스탬프 값을 반환 할 수 있습니다.

그래서 나는 다음과 같은 CSV를 생성해야 : 값은 쿼리에서 반환

tmestamp,metric1,metric2,tmetric3\n 
0,1,,2\n 
1,,2,\n 
2,1,3,6\n 
3,5,5, \n 
4,,,5\n 

는 10000 개 이상의 데이터 포인트 수 있습니다. 이 문제에 어떻게 접근 할 수 있습니까? 스파크 클러스터에서이 프로그램을 실행할 수 있습니까?

내가 노력 코드 :

package com.example; 
import java.io.FileWriter; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import org.kairosdb.client.builder.DataPoint; 
public class Test { 
private static Map<MetricMap, String> metricMaps = new HashMap<>(); 

public static void main(String args[]) { 
    Map<String, List<DataPoint>> metriDps = new HashMap<>(); 
    String[] metricNames = new String[] { "m1", "m2", "m3" }; 
    List<DataPoint> dataPoints1 = new ArrayList<DataPoint>(); 
    DataPoint dp1 = new DataPoint(0, 1); 
    DataPoint dp2 = new DataPoint(2, 1); 
    DataPoint dp3 = new DataPoint(3, 5); 
    dataPoints1.add(dp1); 
    dataPoints1.add(dp2); 
    dataPoints1.add(dp3); 
    metriDps.put("m1", dataPoints1); 
    List<DataPoint> dataPoints2 = new ArrayList<DataPoint>(); 
    DataPoint dp21 = new DataPoint(1, 2); 
    DataPoint dp22 = new DataPoint(2, 3); 
    DataPoint dp23 = new DataPoint(3, 5); 
    dataPoints2.add(dp21); 
    dataPoints2.add(dp22); 
    dataPoints2.add(dp23); 
    metriDps.put("m2", dataPoints2); 
    List<DataPoint> dataPoints3 = new ArrayList<DataPoint>(); 
    DataPoint dp31 = new DataPoint(0, 2); 
    DataPoint dp32 = new DataPoint(2, 6); 
    DataPoint dp33 = new DataPoint(4, 5); 
    dataPoints3.add(dp31); 
    dataPoints3.add(dp32); 
    dataPoints3.add(dp33); 
    metriDps.put("m3", dataPoints3); 
    try { 
     FileWriter writer = new FileWriter("/home/lr/Desktop/csv1.csv"); 
     metriDps.keySet().stream().forEach(key -> createMap(metriDps.get(key), key)); 
     String value; 
     for (MetricMap metricMap : metricMaps.keySet()) { 
      String time = metricMap.getTime(); 
      writer.append(time); 
      writer.append(','); 
      for (int i = 0; i < 3; i++) { 
       MetricMap map = new MetricMap(); 
       map.setName(metricNames[i]); 
       map.setTime(time); 
       value = metricMaps.get(map); 
       if (value != null) 
        writer.append(metricMaps.get(map)); 
       else 
        writer.append(""); 
       if (i == 2) 
        writer.append('\n'); 
       else 
        writer.append(','); 
      } 
     } 
     // generate whatever data you want 

     writer.flush(); 
     writer.close(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

private static void createMap(List<DataPoint> list, String key) { 

    MetricMap map = null; 

    for (DataPoint dp : list) { 
     map = new MetricMap(); 
     map.setName(key); 
     map.setTime(String.valueOf(dp.getTimestamp())); 
     metricMaps.put(map, String.valueOf(dp.getValue())); 
    } 

} 

}

난 정말 당신의 도움을 주셔서 감사합니다.

+0

시리즈가 될 수

static class NamedKeeparator implements Iterator<DataPoint> { private final Iterator<DataPoint> delegate; private final String name; private DataPoint current; public NamedKeeparator(String name, Iterator<DataPoint> delegate) { this.delegate = delegate; this.name = name; } @Override public boolean hasNext() { return delegate.hasNext(); } @Override public DataPoint next() { return current = delegate.next(); } public DataPoint current() { return current; } public void consume() { current = null; } String getName() { return name; } } 

다음과 같은 유틸리티 클래스를 사용하십니까? 모든 것을 "병렬로"반복하여 이동 중에도 출력물을 만드는 데 사용할 것입니다. 맵에 모든 것을 저장하는 것은 확장 성을 제한 할 수있는 방법보다 더 많은 메모리를 사용합니다 (처리해야하는 배치 크기를 줄이기 위해 더 작은 시간 조각으로 모든 3을 쿼리 할 수 ​​있음) – zapl

+0

예 시리즈는 시간 순서대로 정렬됩니다. 어떻게 할 수 있는지 보여 주시겠습니까? – lalithark

+0

내가 가지고있는 코드는 CSV 파일에 중복 값을 쓰고 있습니다. 나는 맞았다. – lalithark

답변

2

알고리즘을 작동 시키려면 시간을 키로 지정하고 포인트의 값 + 메트릭 이름을 값으로 매핑해야합니다. 정렬 된 입력 목록과

Map<String, List<DataPoint>> metriDps = new HashMap<>(); 
String[] metricNames = new String[] { 
     "m1", "m2", "m3" 
}; 
List<DataPoint> dataPoints1 = new ArrayList<DataPoint>(); 
dataPoints1.add(new DataPoint(0, 1)); 
dataPoints1.add(new DataPoint(2, 1)); 
dataPoints1.add(new DataPoint(3, 5)); 
metriDps.put("m1", dataPoints1); 

List<DataPoint> dataPoints2 = new ArrayList<DataPoint>(); 
dataPoints2.add(new DataPoint(1, 2)); 
dataPoints2.add(new DataPoint(2, 3)); 
dataPoints2.add(new DataPoint(3, 5)); 
metriDps.put("m2", dataPoints2); 

List<DataPoint> dataPoints3 = new ArrayList<DataPoint>(); 
dataPoints3.add(new DataPoint(0, 2)); 
dataPoints3.add(new DataPoint(2, 6)); 
dataPoints3.add(new DataPoint(4, 5)); 
metriDps.put("m3", dataPoints3); 

SortedMap<Long, Map<String, String>> map = new TreeMap<>(); 
// format: 
// time1 -> [(metricName, value), (metricName, value), ..] 
// time2 -> [(metricName, value), (metricName, value), ..] 
// .. 

metriDps.entrySet().stream() 
     .forEach(entry -> { 
      List<DataPoint> points = entry.getValue(); 
      String metric = entry.getKey(); 
      points.forEach(point -> { 
       Long time = point.getTimestamp(); 
       Object value = point.getValue(); 
       if (value != null) 
        // add (metricName, value) to map stored under time 
        map.computeIfAbsent(time, key -> new HashMap<>()) 
          .put(metric, value.toString()); 
      }); 
     }); 

StringWriter writer = new StringWriter(); 
// header 
writer.append("timestamp,"); 
writer.append(Stream.of(metricNames).collect(Collectors.joining(","))); 
writer.append('\n'); 
// content, sorted map means we can simply iterate it's keys 
map.entrySet().forEach(entry -> { 
    // time 
    writer.append(String.valueOf(entry.getKey())); 
    writer.append(','); 
    // fetch all possible metric names from the map so it prints empty ",," 
    String line = Stream.of(metricNames) 
      .map(entry.getValue()::get) 
      .map(val -> val == null ? "" : val) 
      .collect(Collectors.joining(",")); 
    writer.append(line); 
    writer.append('\n'); 
}); 
System.out.println(writer); 

인쇄

timestamp,m1,m2,m3 
0,1,,2 
1,,2, 
2,1,3,6 
3,5,5, 
4,,,5 

, 그런 다음 하나 (들)을 초기 값으로 그 시점을 발전, 3 반복자를 유지하여 알고리즘을 향상시킬 수 있습니다 : 다음은 있다고한다. 따라서 모든 시리즈를 병렬/병렬로 반복 할 수 있습니다. 이렇게하면 맵을 작성하고 목록을 하나씩 처리 할 필요가 없으므로 메모리를 절약 할 수 있습니다.


잠재적 인 구현이 시간에 따라 정렬됩니다

StringWriter writer = new StringWriter(); 
// header 
writer.append("timestamp,"); 
writer.append(Stream.of(metricNames).collect(Collectors.joining(","))); 
writer.append('\n'); 

List<NamedKeeparator> iterators = metriDps.entrySet().stream() 
     .map(entry -> new NamedKeeparator(entry.getKey(), entry.getValue().iterator())) 
     .collect(Collectors.toList()); 

List<NamedKeeparator> leastIterators = new ArrayList<>(); 
for (;;) { 
    leastIterators.clear(); 
    long leastValue = Long.MAX_VALUE; 
    for (NamedKeeparator iterator : iterators) { 
     // advance until there is some value 
     while (iterator.current() == null && iterator.hasNext()) { 
      iterator.next(); 
     } 
     // build set of iterators pointing to least value 
     if (iterator.current() != null 
       && iterator.current().getTimestamp() <= leastValue) { 
      if (iterator.current().getTimestamp() < leastValue) { 
       leastValue = iterator.current().getTimestamp(); 
       leastIterators.clear(); 
      } 
      leastIterators.add(iterator); 
     } 
    } 
    // nothing -> all iterators done 
    if (leastIterators.isEmpty()) 
     break; 

    // least contains now iterators for the same timestamp 

    // get time from the first 
    long time = leastIterators.get(0).current().getTimestamp(); 
    writer.append(String.valueOf(time)).append(','); 

    // format points 
    String points = Stream.of(metricNames) 
      .map(metric -> leastIterators.stream() 
        .filter(it -> it.getName().equals(metric)).findAny() 
        .map(it -> it.current()).orElse(null)) 
      .map(point -> point != null ? String.valueOf(point.getValue()) : "") 
      .collect(Collectors.joining(",")); 

    writer.append(points).append('\n'); 

    leastIterators.forEach(it -> { 
     it.consume(); 
    }); 
} 
System.out.println(writer); 

http://ideone.com/pVCfNB

+0

좋습니다. 모두 감사합니다. – lalithark