사용 사례를 이해할 수 있을지 잘 모르겠습니다. monotonically_increasing_id()는 고유 한 ID를 보장하지만 ID가 0 또는 1로 시작하거나 숫자가 연속되도록 보장하지 않습니다. 아래의 예제에서는 데이터 프레임을 다시 분할하여 monotonically_increasing_id()가 연속적인 ID를 보장하지 않는다는 것을 보여줍니다. 어쨌든 드라이버 메모리에 원하는 인덱스 목록이 있다고 가정하면 인덱스 열을 추가 한 후 데이터 프레임에 간단히 참여할 수 있습니다.
//Create a sample dataframe and add rowId column.
//Note that you may see 0,1,2 as rowIds if you dont repartition.
val df = List("A","B","C").toDF.repartition(5).withColumn("rowId", monotonically_increasing_id())
df.show()
+-----+------------+
|value| rowId|
+-----+------------+
| A| 8589934592|
| B| 8589934593|
| C| 34359738368|
+-----+------------+
//[Option 1] to join with indexes we need to add index column to our DataFrame. Assuming your indexes align with sorted rowId
val w = org.apache.spark.sql.expressions.Window.orderBy("rowId")
val result = df.withColumn("index", row_number().over(w) - 1)
//here is our indexes. let convert it to Dataframe to prepare for join
val indexes = List(0, 2).toDF
//finally join
result.join(indexes, result("index") === indexes("value")).show()
+-----+-----------+-----+-----+
|value| rowId|index|value|
+-----+-----------+-----+-----+
| A| 8589934592| 0| 0|
| C|34359738368| 2| 2|
+-----+-----------+-----+-----+
//[Option 2] if your list is small and can easily be sent to all workers, you can also simply filter
result.filter(result("index").isin(List(0, 2):_*)).show()
+-----+-----------+-----+
|value| rowId|index|
+-----+-----------+-----+
| A| 8589934592| 0|
| C|34359738368| 2|
+-----+-----------+-----+
제 제작 환경이 파이썬이므로 제게 pyspark 코드를 제공해 주셔서 감사합니다. –