2017-05-05 5 views
1

데이터 프레임을 사용하여 PySpark에서 여분의 벡터를 만드는 효율적인 방법을 찾고 싶습니다. 위로 요약 형식으로스파 스 벡터 pyspark

df = spark.createDataFrame([ 
    (0, "a"), 
    (1, "a"), 
    (1, "b"), 
    (1, "c"), 
    (2, "a"), 
    (2, "b"), 
    (2, "b"), 
    (2, "b"), 
    (2, "c"), 
    (0, "a"), 
    (1, "b"), 
    (1, "b"), 
    (2, "cc"), 
    (3, "a"), 
    (4, "a"), 
    (5, "c") 
], ["id", "category"]) 
+---+--------+ 
| id|category| 
+---+--------+ 
| 0|  a| 
| 1|  a| 
| 1|  b| 
| 1|  c| 
| 2|  a| 
| 2|  b| 
| 2|  b| 
| 2|  b| 
| 2|  c| 
| 0|  a| 
| 1|  b| 
| 1|  b| 
| 2|  cc| 
| 3|  a| 
| 4|  a| 
| 5|  c| 
+---+--------+ 

:

:

df.groupBy(df["id"],df["category"]).count().show() 
+---+--------+-----+ 
| id|category|count| 
+---+--------+-----+ 
| 1|  b| 3| 
| 1|  a| 1| 
| 1|  c| 1| 
| 2|  cc| 1| 
| 2|  c| 1| 
| 2|  a| 1| 
| 1|  a| 1| 
| 0|  a| 2| 
+---+--------+-----+ 

내 목표는 ID로이 출력을 얻을 수 있습니다

트랜잭션 입력 주어의 말합시다

+---+-----------------------------------------------+ 
| id|          feature | 
+---+-----------------------------------------------+ 
| 2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})| 

올바른 방향으로 나를 가르쳐 주시겠습니까? Java에서 mapreduce를 사용하면 나에게 더 쉬운 것처럼 보였습니다.

답변

4

.

pivoted = df.groupBy("id").pivot("category").count().na.fill(0) 

을 조립 : pivot으로 집계 교체

from pyspark.ml.feature import VectorAssembler 

input_cols = [x for x in pivoted.columns if x != id] 

result = (VectorAssembler(inputCols=input_cols, outputCol="features") 
    .transform(pivoted) 
    .select("id", "features")) 

을 다음과 같이 결과가되고 함께.이 희소성에 따라보다 효율적인 표현을 선택합니다 :

+---+---------------------+ 
|id |features    | 
+---+---------------------+ 
|0 |(5,[1],[2.0])  | 
|5 |(5,[0,3],[5.0,1.0]) | 
|1 |[1.0,1.0,3.0,1.0,0.0]| 
|3 |(5,[0,1],[3.0,1.0]) | 
|2 |[2.0,1.0,3.0,1.0,1.0]| 
|4 |(5,[0,1],[4.0,1.0]) | 
+---+---------------------+ 

하지만 물론 당신은 여전히 ​​하나의 표현으로 변환 할 수 있습니다 :

from pyspark.ml.linalg import SparseVector, VectorUDT 
import numpy as np 

def to_sparse(c): 
    def to_sparse_(v): 
     if isinstance(v, SparseVector): 
      return v 
     vs = v.toArray() 
     nonzero = np.nonzero(vs)[0] 
     return SparseVector(v.size, nonzero, vs[nonzero]) 
    return udf(to_sparse_, VectorUDT())(c) 
+---+-------------------------------------+ 
|id |features        | 
+---+-------------------------------------+ 
|0 |(5,[1],[2.0])      | 
|5 |(5,[0,3],[5.0,1.0])     | 
|1 |(5,[0,1,2,3],[1.0,1.0,3.0,1.0])  | 
|3 |(5,[0,1],[3.0,1.0])     | 
|2 |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])| 
|4 |(5,[0,1],[4.0,1.0])     | 
+---+-------------------------------------+ 
2

데이터 프레임을 RDD로 변환하는 경우 mapreduce와 유사한 프레임 워크 reduceByKey을 따르십시오. 여기에 유일한 까다로운 부분은 (스파 스 벡터에 필요한) 데이터

from pyspark.ml.feature import StringIndexer 
from pyspark.ml.linalg import Vectors 
df = sqlContext.createDataFrame([ 
    (0, "a"), 
    (1, "a"), 
    (1, "b"), 
    (1, "c"), 
    (2, "a"), 
    (2, "b"), 
    (2, "b"), 
    (2, "b"), 
    (2, "c"), 
    (0, "a"), 
    (1, "b"), 
    (1, "b"), 
    (2, "cc"), 
    (3, "a"), 
    (4, "a"), 
    (5, "c") 
], ["id", "category"]) 

범주에 대한 수치 적 표현을 만들기를 만들고, 불꽃의 스파 스 벡터에 대한

가져 오기 패키지 날짜를 포맷하는 것입니다

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") 
df = indexer.fit(df).transform(df) 

그룹 색인에 따라 카운트 받기

df = df.groupBy(df["id"],df["categoryIndex"]).count() 

rdd로 변환하고 데이터를 매핑하십시오. ID &의 키 - 값 쌍에 해당 ID

rdd = rdd.reduceByKey(lambda a, b: a + b) 

위해 [categoryIndex 카운트] 모두의 ID &리스트의 키 값 쌍을 얻을

rdd = df.rdd.map(lambda x: (x.id, [(x.categoryIndex, x['count'])])) 

키로 감소 [categoryIndex는 COUNT]는

rdd = rdd.map(lambda x: (x[0], Vectors.sparse(len(x[1]), x[1]))) 

dataframe로 다시 변환 스파 스 벡터에 각 ID에 대해 [categoryIndex 카운트] 모든 목록을 변환 할 데이터를지도 15,finalDf = sqlContext.createDataFrame (RDD [ 'ID', '기능'])

데이터 체크

pivot VectorAssembler와 함께 아주 쉽게 수행 할 수
finalDf.take(5) 

[Row(id=0, feature=SparseVector(1, {1: 2.0})), 
    Row(id=1, feature=SparseVector(3, {0: 3.0, 1: 1.0, 2: 1.0})), 
    Row(id=2, feature=SparseVector(4, {0: 3.0, 1: 1.0, 2: 1.0, 3: 1.0})), 
    Row(id=3, feature=SparseVector(1, {1: 1.0})), 
    Row(id=4, feature=SparseVector(1, {1: 1.0}))]