2017-10-06 6 views
0

Pyspark를 사용하여 Spark DataFrame에 새로운 열을 만들려고합니다.이 열은 교대로 부울 값 그룹을 기반으로하는 자동 증가 (또는 ID)를 나타냅니다.값 그룹을 번갈아 표시하는 Pyspark 자동 증분

df.show() 
+-----+------------+-------------+ 
|id |par_id  |is_on  | 
+-----+------------+-------------+ 
|40002|1   |true   | 
|40003|2   |true   | 
|40004|null  |false  | 
|40005|17   |true   | 
|40006|2   |true   | 
|40007|17   |true   | 
|40008|240   |true   | 
|40009|1861  |true   | 
|40010|1862  |true   | 
|40011|2   |true   | 
|40012|null  |false  | 
|40013|1863  |true   | 
|40014|626   |true   | 
|40016|208   |true   | 
|40017|2   |true   | 
|40018|null  |false  | 
|40019|2   |true   | 
|40020|1863  |true   | 
|40021|2   |true   | 
|40022|2   |true   | 
+-----+------------+-------------+ 

내가 is_on 속성을 사용하여 id2라고 증분 ID로이 DataFrame을 확장 할 : 나는 다음과 같은 DataFrame이 있다고 할 수 있습니다. 즉, 부울 값의 각 그룹은 증가하는 ID를 가져야합니다. 결과 데이터 프레임은 다음과 같아야합니다.

df.show() 
+-----+------------+-------------+-----+ 
|id |par_id  |is_on  |id2 | 
+-----+------------+-------------+-----+ 
|40002|1   |true   |1 | 
|40003|2   |true   |1 | 
|40004|null  |false  |2 | 
|40005|17   |true   |3 | 
|40006|2   |true   |3 | 
|40007|17   |true   |3 | 
|40008|240   |true   |3 | 
|40009|1861  |true   |3 | 
|40010|1862  |true   |3 | 
|40011|2   |true   |3 | 
|40012|null  |false  |4 | 
|40013|1863  |true   |5 | 
|40014|626   |true   |5 | 
|40016|208   |true   |5 | 
|40017|2   |true   |5 | 
|40018|null  |false  |6 | 
|40019|2   |true   |7 | 
|40020|1863  |true   |7 | 
|40021|2   |true   |7 | 
|40022|2   |true   |7 | 
+-----+------------+-------------+-----+ 

어떤 제안이 있습니까? 이를 위해 어떻게 사용자 정의 함수를 작성할 수 있습니까?

답변

-1
 #this is python spark testing file 

     from pyspark.sql import SparkSession 
     from pyspark.sql.functions import count, col, udf, struct 
     from pyspark.sql.functions import * 
     from pyspark.sql.types import * 

     spark=SparkSession.builder.master("local").appName("durga prasad").config("spark.sql.warehouse.dir","/home/hadoop/spark-2.0.1-bin-hadoop2.7/bin/test_warehouse").getOrCreate() 
     df=spark.read.csv("/home/hadoop/stack_test.txt",sep=",",header=True) 


     # This is udf 

     count=1 # these variable is changed based on function call 
     prStr='' # these variable is changed based on function call 
     def test_fun(str): 
      global count 
      global prStr 
      if str=="false": 
      count=count + 1 
      prStr=str 
      return count 
      if str=="true" and prStr =='false': 
      count=count + 1 
      prStr=str 
      return count 
      elif str=='true': 
      count=count 
      prStr=str 
      return count 
     # udf function end 


     testUDF = udf(test_fun, StringType()) # register udf 
     df.select("id","par_id","is_on",testUDF('is_on').alias("id2")).show() 


     ####output 
       +-----+------+-----+---+ 
       | id|par_id|is_on|id2| 
       +-----+------+-----+---+ 
       |40002|  1| true| 1| 
       |40003|  2| true| 1| 
       |40004| null|false| 2| 
       |40005| 17| true| 3| 
       |40006|  2| true| 3| 
       |40007| 17| true| 3| 
       |40008| 240| true| 3| 
       |40009| 1861| true| 3| 
       |40010| 1862| true| 3| 
       |40011|  2| true| 3| 
       |40012| null|false| 4| 
       |40013| 1863| true| 5| 
       |40014| 626| true| 5| 
       |40016| 208| true| 5| 
       |40017|  2| true| 5| 
       |40018| null|false| 6| 
       |40019|  2| true| 7| 
       |40020| 1863| true| 7| 
       |40021|  2| true| 7| 
       |40022|  2| true| 7| 
       +-----+------+-----+---+