2013-05-06 2 views
4

Mule Studio 3.4.0 Community Edition을 사용하고 있습니다. File Endpoint로 들어오는 큰 CSV 파일을 구문 분석하는 방법에 대해 큰 문제가 있습니다. 시나리오는 3 개의 CSV 파일이 있고 데이터베이스에 files'content를 넣을 것입니다. 그러나 거대한 파일 (약 144MB)을로드하려고하면 "OutOfMemory"예외가 발생합니다. 나는 큰 CSV를 더 작은 크기의 CSV로 나누거나 분할하는 해결책으로 생각했다. (나는이 솔루션이 최고인지는 모르겠다.) 예외를 던지지 않고 CSV를 처리하는 방법을 찾으려고 노력했다.Mule에서 거대한 CSV 파일을 읽는 법

<file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/> 

<flow name="CsvToFile" doc:name="CsvToFile"> 
     <file:inbound-endpoint path="src/main/resources/inbox" moveToDirectory="src/main/resources/processed" responseTimeout="10000" doc:name="CSV" connector-ref="File"> 
      <file:filename-wildcard-filter pattern="*.csv" caseSensitive="true"/> 
     </file:inbound-endpoint> 
     <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property"/> 
     <choice doc:name="Choice"> 
      <when expression="INVOCATION:nome_file=azienda" evaluator="header"> 
       <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/companies-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Azienda"/> 
       <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertAziende" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Azienda"> 
        <jdbc-ee:query key="InsertAziende" value="INSERT INTO aw006_azienda VALUES (#[map-payload:AW006_ID], #[map-payload:AW006_ID_CLIENTE], #[map-payload:AW006_RAGIONE_SOCIALE])"/> 
       </jdbc-ee:outbound-endpoint> 
      </when> 
      <when expression="INVOCATION:nome_file=servizi" evaluator="header"> 
       <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/services-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Servizi"/> 
       <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertServizi" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Servizi"> 
        <jdbc-ee:query key="InsertServizi" value="INSERT INTO ctrl_aemd_unb_servizi VALUES (#[map-payload:CTRL_ID_TIPO_OPERAZIONE], #[map-payload:CTRL_DESCRIZIONE], #[map-payload:CTRL_COD_SERVIZIO])"/> 
       </jdbc-ee:outbound-endpoint> 
      </when> 
      <when expression="INVOCATION:nome_file=richiesta" evaluator="header"> 
       <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/requests-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Richiesta"/> 
       <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertRichieste" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Richiesta"> 
        <jdbc-ee:query key="InsertRichieste" value="INSERT INTO ctrl_aemd_unb_richiesta VALUES (#[map-payload:CTRL_ID_CONTROLLER], #[map-payload:CTRL_NUM_RICH_VENDITORE], #[map-payload:CTRL_VENDITORE], #[map-payload:CTRL_CANALE_VENDITORE], #[map-payload:CTRL_CODICE_SERVIZIO], #[map-payload:CTRL_STATO_AVANZ_SERVIZIO], #[map-payload:CTRL_DATA_INSERIMENTO])"/> 
       </jdbc-ee:outbound-endpoint> 
      </when> 
     </choice> 
    </flow> 

이 문제를 해결하는 방법을 모르겠다. 미리 도움 주셔서 감사합니다.

답변

3

Steves와 말한 것처럼, csv-to-maps-transformer는이를 처리하기 전에 메모리에 전체 파일을로드 할 수 있습니다. 당신이 할 수있는 일은 CSV 파일을 더 작은 부분들로 나누어서 그 부분들을 VM에 보내어 개별적으로 처리하는 것입니다. 그런 다음

public class CSVReader implements Callable{ 
    @Override 
    public Object onCall(MuleEventContext eventContext) throws Exception { 

     InputStream fileStream = (InputStream) eventContext.getMessage().getPayload(); 
     DataInputStream ds = new DataInputStream(fileStream); 
     BufferedReader br = new BufferedReader(new InputStreamReader(ds)); 

     MuleClient muleClient = eventContext.getMuleContext().getClient(); 

     String line; 
     while ((line = br.readLine()) != null) { 
      muleClient.dispatch("vm://in", line, null); 
     } 

     fileStream.close(); 
     return null; 
    } 
} 

을 스트리밍을 사용하려면 현재 file-connector 구성을 유지 두

<file:connector name="File" 
    workDirectory="yourWorkDirPath" autoDelete="false" streaming="true"/> 

<flow name="CsvToFile" doc:name="Split and dispatch"> 
    <file:inbound-endpoint path="inboxPath" 
     moveToDirectory="processedPath" pollingFrequency="60000" 
     doc:name="CSV" connector-ref="File"> 
     <file:filename-wildcard-filter pattern="*.csv" 
      caseSensitive="true" /> 
    </file:inbound-endpoint> 
    <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property" /> 
    <component class="com.dgonza.CSVReader" doc:name="Split the file and dispatch every line to VM" /> 
</flow> 

<flow name="storeInDatabase" doc:name="receive lines and store in database"> 
    <vm:inbound-endpoint exchange-pattern="one-way" 
     path="in" doc:name="VM" /> 
    <Choice> 
     . 
     . 
     Your JDBC Stuff 
     . 
     . 
    <Choice /> 
</flow> 

의 주요 흐름을 분할 : 먼저,이 첫 번째 단계를 달성하기 위해 구성 요소를 만들 수 있습니다. 이 솔루션을 사용하면 전체 파일을 메모리에 먼저로드 할 필요없이 CSV 데이터를 처리 할 수 ​​있습니다. HTH

+0

SteveS와 Daniel 대단히 감사합니다.이 솔루션을 사용해 보겠습니다. –

+0

안녕하세요, 귀하의 스키마를 사용하려고했는데, 내가 몇 가지 행을 삽입 할 수있는 한 번에, 나는이 메시지가 나타납니다 : –

+0

INFO 2013-05-07 18 : 23 : 18,379 [[splitmultithread] .FileSplitter. C : \ workspace_3.4 \ splitmultithread \ src \ main \ resources \ inbox \ richiesta.csv ERROR 2013-05-07 18:24 : org.mule.transport.file.FileMessageReceiver : 00,144 [[splitmultithread] .storeInDatabase.stage1.04] org.mule.processor.AsyncWorkListener : 작업으로 인해 'workCompleted'에서 예외가 발생했습니다. 실행중인 작업은 다음과 같습니다. org.mule.processo[email protected]3bc752 –

1

나는 csv-to-maps-transformer가 전체 파일을 메모리로 강제 전송할 것이라고 생각합니다. 개인적으로 하나의 큰 파일을 다루고 있기 때문에이를 다루기 위해 Java 클래스를 작성하는 경향이 있습니다. 파일 엔드 포인트는 파일 스트림을 사용자 정의 변환기로 전달합니다. 그런 다음 JDBC 연결을 만들고 전체 파일을로드하지 않고 한 번에 한 행 씩 정보를 선택할 수 있습니다. 나는 CSV를 구문 분석하기 위해 OpenCSV을 사용했다. 그래서 자바 클래스는 다음과 같은 것이 포함됩니다 :

protected Object doTransform(Object src, String enc) throws TransformerException { 

    try { 
     //Make a JDBC connection here 

     //Now read and parse the CSV 

     FileReader csvFileData = (FileReader) src; 


     BufferedReader br = new BufferedReader(csvFileData); 
     CSVReader reader = new CSVReader(br); 

     //Read the CSV file and add the row to the appropriate List(s) 
     String[] nextLine; 
     while ((nextLine = reader.readNext()) != null) { 
      //Push your data into the database through your JDBC connection 
     } 
     //Close connection. 

       }catch (Exception e){ 
    } 
+0

먼저 SteveS와 Daniel 모두 도움을 주신 것에 대해 감사드립니다. 나는 당신의 해결책을 시도 할 것이고, 나는 약간의 문제가있을 것이라면 여기에 있기를 바랍니다. 고맙습니다. –