2017-12-23 42 views
0

대용량 데이터 집합을 병렬 처리하기위한 JavaSE8 응용 프로그램이 있습니다. 하나의 압축 파일로 직렬화하고자하는 1M 개체를 생성 중입니다. 파일은 웹 앱에서 다운로드/업로드됩니다. 병렬 처리가 잘 최적화되어 있습니다. 그러나 직렬화/압축은 순차적으로 수행되며 내 응용 프로그램의 병목 현상입니다.Java 병렬 직렬화 및 압축

다른 솔루션을 테스트했습니다 : Kryo, ChronicleMap ... 이제 Kryo와 Bz2 압축을 사용하고 있습니다. 효과가 있습니다. 그러나 성능이 충분하지 않습니다.

병렬 직렬화 및 압축에 대한 해결책을 찾을 수 없습니다. 이 점에 관한 모든 정보는 환영합니다

답변

1

데이터 집합을 병렬 또는 순차적으로 처리하는 방법과는 관계가 없으므로 명확한 디자인이 필요합니다. 직렬화는 항상 출력 스트림, 소켓 등의 순차적 특성 때문에 발생합니다.) 작업을 수행하고 데이터 집합 처리를 유지합니다. 직렬화 된 데이터 세트를 직렬화하여 파일, 연결 또는 원시 메모리에 저장하려는 경우 동시적인 경쟁 및 원하지 않는 수정으로부터 데이터를 보호 할 장벽을 정의해야합니다.

확실한 것은 각 작업 스레드가 http 서버와 같은 데이터 자체를 직렬화하는 경우가 있지만 여기에서는 병렬 처리되고 마지막으로 직렬화되는 단일 데이터 세트에 대해 이야기하고 있습니다.

위의 설명에 따르면 대답하기에 적합한 코드라고 생각합니다. 표준 Java 직렬화 + GZIP 압축을 사용합니다. 이 코드에서 직렬화 및/또는 압축을 쉽게 대체하고 현재 솔루션과 벤치 마크 할 수 있습니다.

package com.example.demo; 

import java.io.*; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 
import java.util.zip.GZIPInputStream; 
import java.util.zip.GZIPOutputStream; 

import static java.lang.String.format; 

public final class ParallelObjectsSerialization { 

    private static final int ONE_MILLION = 1_000_000; 
    private static final String SERIALIZE_FILE = "/tmp/out.bin"; 

    public static void main(String[] args) throws IOException, ClassNotFoundException { 
//  List<Player> players = parallelGenerate1MPlayers(); 
     List<Player> players = seqGenerate1MPlayers(); 
     serialize(players); 
     players.clear(); 
     players = deserialize(); 
    } 

    private static List<Player> deserialize() throws IOException, ClassNotFoundException { 
     long started = System.currentTimeMillis(); 
     List<Player> players = new ArrayList<>(); 
     try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new FileInputStream(SERIALIZE_FILE)))) { 
      for (int i = 0; i < ONE_MILLION; i++) { 
       players.add((Player) in.readObject()); 
      } 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("deserialization of %d objects took %d ms", players.size(), time)); 
     return players; 
    } 

    private static final class Player implements Serializable { 
     private final String name; 
     private final int level; 

     private Player(String name, int level) { 
      this.name = name; 
      this.level = level; 
     } 
    } 

    private static List<Player> seqGenerate1MPlayers() { 
     long started = System.currentTimeMillis(); 
     List<Player> players = new ArrayList<>(ONE_MILLION); 
     for (int i = 0; i < ONE_MILLION; i++) { 
      players.add(new Player(randomName(i), i)); 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("sequential generating of %d objects took %d ms", players.size(), time)); 
     return players; 
    } 

    private static List<Player> parallelGenerate1MPlayers() { 
     long started = System.currentTimeMillis(); 
     Player[] players = new Player[ONE_MILLION]; 
     Arrays.parallelSetAll(players, (i) -> new Player(randomName(i), i)); 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("parallel generating of %d objects took %d ms", players.length, time)); 
     return Arrays.asList(players); 
    } 

    private static void serialize(List<Player> players) throws IOException { 
     long started = System.currentTimeMillis(); 
     try (ObjectOutputStream out = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(SERIALIZE_FILE)))) { 
      for (Player player : players) { 
       out.writeObject(player); 
      } 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("serialization of %d objects took %d ms", players.size(), time)); 
    } 

    private static String randomName(int seed) { 
     StringBuilder builder = new StringBuilder(); 
     double chance = 30.0; 
     for (char c = 'a'; c <= 'z'; c++) { 
      if (Math.random() * 100.0 <= chance) { 
       builder.append(c); 
       if (builder.length() == 7) { 
        break; 
       } 
      } 
     } 
     if (builder.length() == 0) { 
      builder.append("unknown").append(seed); 
     } 
     return builder.toString(); 
    } 
} 
+0

감사합니다. Alexander. 이 솔루션을 이미 테스트했습니다. 직렬화가 순차적으로 수행되어야하므로 전체 프로세스에서 병목 현상이 발생합니다. – kem

+0

분할 컬렉션을 여러 개의 작은 컬렉션으로 나누어 병렬 처리하고 zip 또는 tar 파일로 그룹화 하시겠습니까? – kem

+1

나는이 경우에'''tar''가 병목이 될까 봐 걱정됩니다. 내 게시물 - 마지막 단계에서 언급 한대로 항상 순차적 인 작업이 될 것입니다. 그러나 직렬화가 CPU 바운드이고 개체 직렬화 중에 CPU 사용률에 문제가 나타나는 경우'''tar''는 작은 일련의 청크에서 작동하지만 디스크 I/O 포화에주의하십시오. –