2017-10-26 19 views
4

프로젝트에서 작업 중이고 다음과 같은 경우에 멈추었습니다.Apache Spark를 사용하여 테이블에 다중 삽입

나는 테이블이 있습니다 superMerge (ID, 이름, 급여)

을 나는이 개 다른 테이블이 : 표 1 및 표 2

모든 테이블 (표 1, 표 2 및 superMerge)와 동일한 구조를 가지고 있습니다.

이제는 table1과 table2의 superMerge 테이블을 삽입/업데이트하는 것이 좋습니다. table1은 매 10 분마다 table2가 20 분마다 업데이트되므로 시간 t = 20 분에 동일한 작업 (이 경우 superMerge)을 시도하는 작업이 2 개 있습니다.

이 병렬 삽입/업데이트를 어떻게 수행 할 수 있는지 알고 싶습니다. spark 또는 다른 hadoop 응용 프로그램을 사용하여 superMerge 테이블에 병합하십시오.

+0

테이블에 대해 더 자세히 설명해 주실 수 있습니까? superMerge로 무엇을하려고합니까? 끊임없이 table1과 table2를 같은 위치에 붙인 다음, 필요할 때 그 위치에서 superMerge를 읽는 것이 잘못된 이유는 무엇입니까? – ayplam

+0

superMerge는 일반 테이블 대신 테이블 1과 2의 뷰일 수 있으므로 superMerge를 업데이트 할 필요가 없습니다. –

+1

@ ArthurJulião가보기를 만드는 것은 옵션이며 나는 직접적인 해결책이 없다는 것을 실제로 알게 될 것입니다. 하지만 나는 아마 스파크 또는 피닉스가 제공하는 솔루션을 찾고 있는데, 나는 그것을 모르고있다. – GKV

답변

5

여기에서 문제는 두 작업이 서로 통신 할 수없고 다른 작업이 무엇을하고 있는지를 모르는 것입니다.

  • 각 작업은 HDFS의 특정 폴더에 (빈) 파일을 만듭니다 업데이트/삽입이 진행하고 제거합니다에 있음을 나타내는 : 비교적 쉬운 솔루션은 기본 파일 기반 "잠금"시스템을 구현하는 whould 작업이 완료되면 해당 파일

  • 이제 각 작업은 업데이트/삽입을 시작하기 전에 이러한 파일이 존재하는지 여부를 확인해야합니다. 파일이 존재할 때까지 작업은 파일이 없어 질 때까지 대기해야합니다.

1

작업 코드 1을 제어 할 수 있습니까? & job2? 어떻게 스케줄링합니까?

일반적으로 두 작업을 10 분마다 1로 변환 할 수 있습니다. 20 분 안에이 통합 작업은 다른 모드 (2 개의 테이블에서 병합)로 실행되지만 기본 모드는 1 개의 테이블에서만 병합됩니다. 그래서 같은 드라이버를 가지고있을 때 두 작업 (예 : 잠금)간에 동기화가 필요 없습니다. 이 솔루션은 작업이 10 분 미만으로 완료된다고 가정합니다.

1

데이터 세트의 크기는 얼마나됩니까? Batch (스파크)로 할 계획입니까, 아니면 삽입/업데이트를 스트리밍 할 수 있습니까 (스파크 스트리밍)?

  • 시작 하나의 작업을 두 테이블을 처리 할 수있는 10 분마다 :

    는 일괄 적으로 그것을한다고 가정 할 수 있습니다. 만약 당신이 표 1과 표 2를 가지고 있다면, 연합을하고 슈퍼 머지와 합류하십시오. 이고르 버먼 (Igor Berman)이 제안했듯이.

  • superMerge 테이블이 커질수록 조인 시간이 오래 걸릴 수 있으므로주의하십시오.
0

이 상황에 직면하게되면 tb1 DF1을 위치 1에, tb2 DF2를 위치 2에 쓰고 끝에는 슈퍼 병합 테이블로 경로를 전환하면 테이블을 삽입 할 수 있지만 소비하는 것은 가능합니다. 하이브에서 특히 많은 런타임.준비 위치에 덮어

는 LOCATION1 위치 2 : 슈퍼 병합 테이블

df1.write.mode("overwrite").partitionBy("partition").parquet(location1) 

df2.write.mode("overwrite").partitionBy("partition").parquet(location2) 

스위칭 경로 :

hiveContext.sql(alter table super_merge_table add if not exists partition(partition=x); LOAD DATA INPATH 'location1/partition=x/' INTO TABLE super_merge_table partition(partition=x))" 

hiveContext.sql(alter table super_merge_table add if not exists partition(partition=x); LOAD DATA INPATH 'location2/partition=x/' INTO TABLE super_merge_table partition(partition=x))" 

당신은 병렬로 병합 할 수 다른에서 하나를 오버라이드하지 않고.