하나의 접근법은 "각 고유 값에 대해 인접한 모든 시간 범위를 찾아 값을 합친 것"으로 문제를 표현하는 것입니다. 이러한 이해를 통해 groupBy
값을 사용하여 각 값에 start
및 end
의 목록을 만들 수 있습니다. 그런 다음 사용자 정의 함수를 사용하여 이러한 함수를 연속 시간 범위로 축소 할 수 있습니다.
극단적으로 데이터 세트에서 디스크 전용 지속성 레벨을 사용하는 경우 유일한 요구 사항은 메모리에 start_end
초의 단일 행을 넣을 수 있어야한다는 것입니다. 따라서 대부분의 클러스터에 대해이 접근법의 상한선을 start_end
쌍의 값으로 설정합니다. 여기
가 구현 한 예이다 (요청에 따라 자바 API를 사용하여 - 스칼라는 상당히 덜 장황 것) :
public class JavaSparkTest {
public static void main(String[] args){
SparkSession session = SparkSession.builder()
.appName("test-changes-in-time")
.master("local[*]")
.getOrCreate();
StructField start = createStructField("start", DataTypes.IntegerType, false);
StructField end = createStructField("end", DataTypes.IntegerType, false);
StructField value = createStructField("value", DataTypes.IntegerType, false);
StructType inputSchema = createStructType(asList(start,end,value));
StructType startEndSchema = createStructType(asList(start, end));
session.udf().register("collapse_timespans",(WrappedArray<Row> startEnds) ->
JavaConversions.asJavaCollection(startEnds).stream()
.sorted((a,b)->((Comparable)a.getAs("start")).compareTo(b.getAs("start")))
.collect(new StartEndRowCollapsingCollector()),
DataTypes.createArrayType(startEndSchema)
);
Dataset<Row> input = session.createDataFrame(asList(
RowFactory.create(123, 124, 1),
RowFactory.create(124, 128, 1),
RowFactory.create(128, 300, 2),
RowFactory.create(300, 400, 2),
RowFactory.create(400, 500, 3),
RowFactory.create(500, 600, 3),
RowFactory.create(600, 700, 3)
), inputSchema);
Dataset<Row> startEndByValue = input.selectExpr("(start start, end end) start_end", "value");
Dataset<Row> startEndsByValue = startEndByValue.groupBy("value").agg(collect_list("start_end").as("start_ends"));
Dataset<Row> startEndsCollapsed = startEndsByValue.selectExpr("value", "explode(collapse_timespans(start_ends)) as start_end");
Dataset<Row> startEndsInColumns = startEndsCollapsed.select("value", "start_end.start", "start_end.end");
startEndsInColumns.show();
}
public static class StartEndRowCollapsingCollector implements Collector<Row, List<Row>, List<Row>>{
@Override
public Supplier<List<Row>> supplier() {
return()-> new ArrayList<Row>();
}
@Override
public BiConsumer<List<Row>, Row> accumulator() {
return (rowList, row) -> {
// if there's no rows in the list or the start doesn't match the current end
if(rowList.size()==0 ||
!rowList.get(rowList.size()-1).getAs(1).equals(row.getAs(0))){
rowList.add(row);
} else {
Row lastRow = rowList.remove(rowList.size()-1);
rowList.add(RowFactory.create(lastRow.getAs(0), row.getAs(1)));
}
};
}
@Override
public BinaryOperator<List<Row>> combiner() {
return (a,b)->{ throw new UnsupportedOperationException();};
}
@Override
public Function<List<Row>, List<Row>> finisher() {
return i->i;
}
@Override
public Set<Characteristics> characteristics() {
return Collections.EMPTY_SET;
}
}
}
그리고 프로그램 출력 :
+-----+-----+---+
|value|start|end|
+-----+-----+---+
| 1| 123|128|
| 3| 400|700|
| 2| 128|400|
+-----+-----+---+
공지 값이 없습니다 순서대로. 이는 스파크가 데이터 세트를 분할하고 값 행을 처리했기 때문이며 행 순서에 어떤 중요성도 지정하지 않도록했기 때문입니다. 당신은 시간 또는 값을 정렬 된 출력을 요구해야합니다. 물론 평소와 같이 정렬 할 수 있습니다.
예를 들어, 지연을 사용하여 마지막 값을 얻은 다음 현재 및 최종 값이 다른지 여부에 따라 데이터 세트를 필터링하십시오. –
만약 다른 행이 130 200 1이라면 출력은 무엇입니까 –
우리는 창 함수를 사용할 수 있지만 스파크는 시계열 데이터를 처리하는데 최적이 아닙니다. 선택의 여지가없는 경우 https : // github .com/sryza/spark-timeseries – SanthoshPrasad