2017-11-15 10 views
0

에 - 나는 MAP{Hour,MAP{ID,Ratio}} 같은 구조하는 RDD로 저장할이 방법 중첩 된지도 RDD의 내가 같은 텍스트 파일이 불꽃

ID,Hour,Ratio 
100775,0.0,1.0 
100775,1.0,1.0560344797302321 
100775,2.0,1.1333317975785973 
100775,3.0,1.1886133302168074 
100776,4.0,1.2824427440125867 

. 가장 가까운 구조는 JavaPairRDD입니다. 나는 JavaPairRDD{Hour,MAP{ID,Ratio}} 같은 구조를 구현하는, 그러나,이 구조는 내가 본질적으로 가장이를 얻는 방법에 대한 모든 포인터

ratio = MAP.get(Hour).get(ID) 

을 원하는대로, 내 사용 사례가 해결되지 LIST{MAP{ID,RATIO}}을 반환 lookup() 기능을 제공 노력 끝난.

UPDATE : - :

JavaRDD<Map<String,Map<String,String>>> mapRDD = data.map(line -> line.split(",")).map(array-> Collections 
       .singletonMap(array[0], 
       Collections 
       .singletonMap 
       (array[1],array[2]))); 

그러나, 여기에 사용 가능한 기능과 같은 더 조회()이없는, 정확한 -

는 라 메쉬의 대답 후, 나는 다음과 같은 시도?

답변

0

, 나는 다음과 같이 가기로 결정했다 : 주요으로이 예제 시간에, 결과는 뭔가 같은 것 -

가 나는 JavaPairRDD {시간을 만들어, MAP {ID, Ratio}}. 작업이 실행될 때마다 언제든지 해당 시간에만 해당하는지도가 필요합니다. 그래서 나는 다음과 같은 한 : -이 이제 더 방송 변수로 사용될 수

Map<String, Double> result = new HashMap<>(); 
javaRDDPair.lookup(HOUR).stream().forEach(map ->{ 
      result.putAll(map.entrySet().stream().collect(Collectors.toMap(entry-> entry.getKey(), entry-> entry.getValue()))); 
     }); 

.

0

다음은 다음은 다음을 수행 할 수 있습니다 RDD[Map[String, Map[String, String]]]을 필요로하는 경우가

scala> val rdd = sc.textFile("path to the csv file") 
rdd: org.apache.spark.rdd.RDD[String] = path to csv file MapPartitionsRDD[7] at textFile at <console>:24 

scala> val maps = rdd.map(line => line.split(",")).map(array => (array(1), Map(array(0) -> array(2)))).collectAsMap() 
maps: scala.collection.Map[String,scala.collection.immutable.Map[String,String]] = Map(1.0 -> Map(100775 -> 1.0560344797302321), 4.0 -> Map(100776 -> 1.2824427440125867), 0.0 -> Map(100775 -> 1.0), 3.0 -> Map(100775 -> 1.1886133302168074), 2.0 -> Map(100775 -> 1.1333317975785973)) 

을 수행 할 수있는 작업입니다.

scala> val rddMaps = rdd.map(line => line.split(",")).map(array => Map(array(1) -> Map(array(0) -> array(2)))).collect 
rddMaps: Array[scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,String]]] = Array(Map(0.0 -> Map(100775 -> 1.0)), Map(1.0 -> Map(100775 -> 1.0560344797302321)), Map(2.0 -> Map(100775 -> 1.1333317975785973)), Map(3.0 -> Map(100775 -> 1.1886133302168074)), Map(4.0 -> Map(100776 -> 1.2824427440125867))) 

나는 대답은 그것은 불꽃의 데이터 세트로 작업하는 일반적인 문제입니다

+0

도움이 되긴하지만지도를 반환하지만 RDD를 반환하고 싶습니다. – Sanchay

+0

나는 rdd 응답도 포함시켰다. :) –

+0

수집을 사용했기 때문에 혼란스러워했습니다. 당신이 수집을 사용하지 않는다면, 당신은'scala> val으로 rdd를 얻는다. rddMaps = rdd.map (line => line.split (",")). map (array => Map (array (1) -> Map 배열 (0) -> 배열 (2)))) rddMaps : org.apache.spark.rdd.RDD [scala.collection.immutable.Map [String, String]]] = MapPartitionsRDD [17] at map : 26' –

-1

도움이되기를 바랍니다. 일반적으로 각 행이 일부 샘플을 포함하고 각 열은 각 샘플의 기능을 나타내는 데이터 집합이 있습니다. 그러나 공통적 인 문제에 대한 일반적인 해결책은 각 열을 해당 속성으로 지원하고 각 샘플이 RDD 개체가되도록 Entity를 정의하는 것입니다. rdd에서 이러한 각 객체에 액세스하려면 javapairrdd를 사용하여 수행 할 수 있습니다. 내 사용 사례를 들어

Javapairrdd<INTEGER,Entity>