2017-10-27 16 views
0

필자는 10^8 행 수의 스파크 데이터 프레임 df를 가지고 있습니다. 그 데이터 프레임 위에 rowId로 기본 키로 사용하려는 열을 추가했습니다. 나는기존의 데이터 프레임에서 일부 선택된 행 세트를 사용하여 새로운 스파크 데이터 프레임 구성하기

df.withColumn과 같이 명령을 사용하여 동일한 일을했다 ("ROWID", monotonically_increasing_id()) 내가 행의 일부 선택된 수의 새로운 데이터 프레임을 선택하려면 해당 데이터 프레임에서 이제

색인은 이미 목록의 형태로 나에게 알려져있다. 목록에있는 선택된 수의 행을 사용하여 새 데이터 프레임을 만들 수있는 사람이 있다면 도움이됩니다.

답변

0

사용 사례를 이해할 수 있을지 잘 모르겠습니다. monotonically_increasing_id()는 고유 한 ID를 보장하지만 ID가 0 또는 1로 시작하거나 숫자가 연속되도록 보장하지 않습니다. 아래의 예제에서는 데이터 프레임을 다시 분할하여 monotonically_increasing_id()가 연속적인 ID를 보장하지 않는다는 것을 보여줍니다. 어쨌든 드라이버 메모리에 원하는 인덱스 목록이 있다고 가정하면 인덱스 열을 추가 한 후 데이터 프레임에 간단히 참여할 수 있습니다.

//Create a sample dataframe and add rowId column. 
//Note that you may see 0,1,2 as rowIds if you dont repartition. 

val df = List("A","B","C").toDF.repartition(5).withColumn("rowId", monotonically_increasing_id()) 
df.show() 
+-----+------------+ 
|value|  rowId| 
+-----+------------+ 
| A| 8589934592| 
| B| 8589934593| 
| C| 34359738368| 
+-----+------------+ 

//[Option 1] to join with indexes we need to add index column to our DataFrame. Assuming your indexes align with sorted rowId 
val w = org.apache.spark.sql.expressions.Window.orderBy("rowId") 
val result = df.withColumn("index", row_number().over(w) - 1) 

//here is our indexes. let convert it to Dataframe to prepare for join 
val indexes = List(0, 2).toDF 
//finally join 
result.join(indexes, result("index") === indexes("value")).show() 
+-----+-----------+-----+-----+ 
|value|  rowId|index|value| 
+-----+-----------+-----+-----+ 
| A| 8589934592| 0| 0| 
| C|34359738368| 2| 2| 
+-----+-----------+-----+-----+ 


//[Option 2] if your list is small and can easily be sent to all workers, you can also simply filter 
result.filter(result("index").isin(List(0, 2):_*)).show() 
+-----+-----------+-----+ 
|value|  rowId|index| 
+-----+-----------+-----+ 
| A| 8589934592| 0| 
| C|34359738368| 2| 
+-----+-----------+-----+ 
+0

제 제작 환경이 파이썬이므로 제게 pyspark 코드를 제공해 주셔서 감사합니다. –