2017-02-27 8 views
0

3 백만 개 이상의 레코드가있는 테이블이 있습니다. 그리고 DB에서 모든 레코드를 읽고, 다른 시스템이 처리 할 수 ​​있도록 kafka 대기열로 처리를 보내야합니다. 그런 다음 출력 카프카 큐의 결과를 읽고 DB에 다시 씁니다.
정상적인 부분을 읽고 쓰고 싶습니다. 그렇지 않으면 즉시 OOM 예외가 발생합니다.Mybatis 일괄 처리

mybatis를 사용하여 일괄 읽기 및 쓰기 작업을 수행하는 데 가능한 기술적 솔루션이 될 수 있습니까?
깔끔한 작업 예제는 많이 감사하겠습니다.

답변

1

나는 카프카에 대해 잘 모르겠으므로 의사 코드를 작성합니다.

처음 읽기 시간에 Mybatis의 기본 동작은 목록에 결과를 반환하는 것이지만 메모리에 3 백만 개의 개체를로드하지 않으려는 것입니다. MyBatis로 전역 설정에 정의 된 값이없는 경우 : (@Option(fetchSize=500) 주석 기반 매퍼를 사용하는 경우) 이것은 또한 명령문의 fetchSize을 설정 org.apache.ibatis.session.ResultHandler<T>

public void handleResult(final ResultContext<YourType> context) { 
    addToKafkaQueue(context.getResultObject()); 
} 

의 사용자 지정 구현을 사용하여 오버라이드 (override) 할 수 있어야합니다. 설정을 해제하면이 옵션은 기본적으로 드라이버 값에 의존하며 모든 DB 공급 업체마다 다릅니다. 이것은 얼마나 많은 레코드가 결과 집합에 즉시 버퍼링 될지 정의합니다. 예 : Oracle의 경우이 값은 10입니다. 일반적으로 앱에서 DB로 작업을 많이 읽으므로 너무 낮습니다. Postgresql의 경우 이것은 무제한이며 (전체 결과 집합), 너무 많습니다. 속도와 메모리 사용간에 올바른 균형을 찾아야합니다. 업데이트에 대한

:

do { 
    YourType object = readFromKafkaQueue(); 
    mybatisMapper.update(object); 
} while (kafkaQueueHasMoreElements()); 
sqlSession.flushStatement(); // only when using ExecutorType.BATCH 

가장 중요한 인 ExecutorType 기본 ExecutorType.SIMPLE 또는 ExecutorType.BATCH에 한 번만 문을 준비하는 대신 모든 반복에 수 중 ExecutorType.REUSE (이 SessionFactory.openSession()에 인수입니다) 그 것이다 명령문을 스택하고 실제로 플러시시에만 실행하십시오.

트랜잭션에 대해 생각해 볼 필요가 있습니다. 3 백만 건의 업데이트가 필요하거나 세그먼트화할 수 있습니다.

0

일괄 처리를 위해 매퍼의 인스턴스를 별도로 만들어야합니다.

도움이 될 수 있습니다.

+0

정보를 단지 링크가 아닌 대답에 추가하십시오. 링크가 사라질 수 있습니다. – user3486184