많은 작업을 수행하기 위해 스트리밍 데이터 프레임을 일반 데이터 프레임으로 변환하고 싶습니다. count.distinct 실시간 분석을위한 복잡한 쿼리.스트리밍 DataFrame을 일반 배치 DataFrame으로 변환하는 방법?
spark에서 정상적인 데이터 프레임으로 스트리밍 데이터 프레임을 변환하는 것에 대해 알고 싶다면 제안하십시오.
많은 작업을 수행하기 위해 스트리밍 데이터 프레임을 일반 데이터 프레임으로 변환하고 싶습니다. count.distinct 실시간 분석을위한 복잡한 쿼리.스트리밍 DataFrame을 일반 배치 DataFrame으로 변환하는 방법?
spark에서 정상적인 데이터 프레임으로 스트리밍 데이터 프레임을 변환하는 것에 대해 알고 싶다면 제안하십시오.
최선의 방법은 Sink 사용자 정의 스트리밍을 작성하고 DataFrame
모든 배치를 addBatch
에 액세스하는 것입니다.
Sink
은 매우 짧기 때문에 여기에 scaladoc과 코드를 인용합니다.
/**
* An interface for systems that can collect the results of a streaming query. In order to preserve
* exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
* batch.
*/
trait Sink {
/**
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* this method is called more than once with the same batchId (which will happen in the case of
* failures), then `data` should only be added once.
*
* Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
* Otherwise, you may get a wrong result.
*
* Note 2: The method is supposed to be executed synchronously, i.e. the method should only return
* after data is consumed by sink successfully.
*/
def addBatch(batchId: Long, data: DataFrame): Unit
}
도 읽을 StreamSinkProvider.
로컬 또는 카프카에 스트리밍 데이터 프레임을 저장하십시오. 그리고 로컬 또는 카프카에서 배치 모드로 읽으십시오.
로컬에서? 어떻게 생겼어? "소스"를 가리 키도록주의를 기울 이겠습니까? –