2017-12-27 22 views
1

제 쿼리가 수백만 행을 반환하는 경우 JdbcIO가 병렬로 쿼리를 실행하는 방법을 알고 싶습니다. https://issues.apache.org/jira/browse/BEAM-2803과 관련된 풀 요청을 언급했습니다. 나는 그것을 완전히 이해할 수 없었다.Apache Beam을 사용하여 데이터베이스에서 대량 데이터 읽기

ReadAllexpand 메서드는 ParDo을 사용합니다. 따라서 병렬로 데이터를 읽으려면 데이터베이스에 대한 다중 연결을 생성합니까? 데이터 소스에서 DB에 연결할 수있는 연결 수를 제한하면 연결 제한에 충실합니까?

아무도 나를 JdbcIO에서 어떻게 처리 할 수 ​​있는지 이해할 수 있도록 도와 주시겠습니까? 나는 2.2.0

업데이트를 사용하고 있습니다 :

.apply(
      ParDo.of(
       new ReadFn<>(
        getDataSourceConfiguration(), 
        getQuery(), 
        getParameterSetter(), 
        getRowMapper()))) 

위의 코드는 ReadFn는 파르 적용되는 것을 보여줍니다. 필자는 ReadFn이 병렬로 실행될 것이라고 생각합니다. 내 가정이 맞는다면, 한 번에 제한된 수의 연결 만 설정할 수있는 DB에서 읽으려면 어떻게 readAll() 메서드를 사용합니까? Balu

답변

0

ReadAll 메쏘드 방법은 당신이 많은 여러 쿼리가있는 경우를 처리

감사합니다. 각 문자열이 쿼리 인 문자열의 PCollection으로 쿼리를 저장할 수 있습니다. 그런 다음 독서시 각 항목은 단일 ParDo에서 별도의 쿼리로 처리됩니다.

작은 수의 쿼리에서는 평행성을 쿼리 수로 제한하기 때문에이 기능이 제대로 작동하지 않습니다. 그러나 당신은 많은 것을 가지고 있다면, 그것은 훨씬 빨리 수행 할 것입니다. 대부분의 ReadAll 호출의 경우입니다.

코드에서 설정 기능에서 작업자별로 연결이 이루어진 것처럼 보입니다. 여기에는 작업자 수 및 쿼리 수에 따라 몇 가지 쿼리가 포함될 수 있습니다.

쿼리 제한은 어디에 설정되어 있습니까? ReadAll의 유무와 유사하게 동작해야합니다.

은 자세한 내용은 JIRA 참조 : https://issues.apache.org/jira/browse/BEAM-2706

내가 jdbcIO 매우 익숙하지 오전하지만이 버전은 JIRA에서 제안 구현처럼 보인다. 여기서 PCollection은 무엇이든 될 수 있고 콜백은 PCollection의 요소에 따라 쿼리를 수정합니다. 이렇게하면 PCollection의 각 항목이 쿼리를 나타내지 만 약간 더 융통성이 있으며 각 요소로 새 쿼리가 생성됩니다.

+0

라라, 댓글 주셔서 감사합니다. 하지만, 내 질문은 단일 쿼리가 DB에서 수백만 행을로드하는 경우에만 해당됩니다. 나는 그러한 데이터를 병렬로 읽는 구현을 보지 못했다. 여기서 우리는 어떻게 병렬 처리를 할 수 있습니까? – Balu

0

다음과 같이 데이터 소스를 만들었습니다.

ComboPooledDataSource cpds = new ComboPooledDataSource(); 
    cpds.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver 
    cpds.setJdbcUrl("jdbc:mysql://<IP>:3306/employees"); 
    cpds.setUser("root"); 
    cpds.setPassword("root"); 
    cpds.setMaxPoolSize(5); 

지금이 드라이버를 설정하는 더 좋은 방법이 있습니다. 데이터베이스 풀 크기를 5로 설정했습니다. JdbcIO 변환을 수행하는 동안이 데이터 소스를 사용하여 연결을 생성했습니다. 는 파이프 라인에서 나는 약 300 만 기록을 반환하는 쿼리를 사용

option.setMaxNumWorkers(5); 
option.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED); 

을 설정합니다. DB 연결을 관찰하는 동안 프로그램이 실행되는 동안 연결 수가 점차 증가했습니다. 특정 인스턴스에서 최대 5 개의 연결을 사용했습니다. 데이터베이스에서 대량 데이터를로드하기 위해 트랜잭션을 실행하는 동안 DB에 생성 된 연결 수를 제한 할 수있는 방법이라고 생각합니다.
<dependency> 
     <groupId>c3p0</groupId> 
     <artifactId>c3p0</artifactId> 
     <version>0.9.1.2</version> 
    </dependency> 

ComboPoolDataSource

에 대한

메이븐 의존성 ** 내가 여기서 뭔가를 놓친 경우 답을 수정 해 주시기 바랍니다. *