2017-05-15 16 views
1

json 파일의 데이터를 파싱하는 동안 UDF로 사용될 클래스에서 멤버 함수를 정의하려고합니다. 나는 특성을 사용하여 메소드 세트와 그 메소드를 대체하는 클래스를 정의합니다.scala 클래스 멤버 함수가 UDF

trait geouastr { 
    def getGeoLocation(ipAddress: String): Map[String, String] 
    def uaParser(ua: String): Map[String, String] 
} 

class GeoUAData(appName: String, sc: SparkContext, conf: SparkConf, combinedCSV: String) extends geouastr with Serializable { 
    val spark = SparkSession.builder.config(conf).getOrCreate() 
    val GEOIP_FILE_COMBINED = combinedCSV; 

    val logger = LogFactory.getLog(this.getClass) 
    val allDF = spark. 
    read. 
    option("header","true"). 
    option("inferSchema", "true"). 
    csv(GEOIP_FILE_COMBINED).cache 

    val emptyMap = Map(
    "country" -> "", 
    "state" -> "", 
    "city" -> "", 
    "zipCode" -> "", 
    "latitude" -> 0.0.toString(), 
    "longitude" -> 0.0.toString()) 

    override def getGeoLocation(ipAddress: String): Map[String, String] = { 
    val ipLong = ipToLong(ipAddress) 
    try { 
     logger.error("Entering UDF " + ipAddress + " allDF " + allDF.count()) 
     val resultDF = allDF. 
     filter(allDF("network").cast("long") <= ipLong.get). 
     filter(allDF("broadcast") >= ipLong.get). 
     select(allDF("country_name"), allDF("subdivision_1_name"),allDF("city_name"), 
      allDF("postal_code"),allDF("latitude"),allDF("longitude")) 
      val matchingDF = resultDF.take(1) 
      val matchRow = matchingDF(0) 
     logger.error("Lookup for " + ipAddress + " Map " + matchRow.toString()) 
     val geoMap = Map(
      "country" -> nullCheck(matchRow.getAs[String](0)), 
      "state" -> nullCheck(matchRow.getAs[String](1)), 
      "city" -> nullCheck(matchRow.getAs[String](2)), 
      "zipCode" -> nullCheck(matchRow.getAs[String](3)), 
      "latitude" -> matchRow.getAs[Double](4).toString(), 
      "longitude" -> matchRow.getAs[Double](5).toString()) 
     } catch { 
     case (nse: NoSuchElementException) => { 
      logger.error("No such element", nse) 
      emptyMap 
     } 
     case (npe: NullPointerException) => { 
      logger.error("NPE for " + ipAddress + " allDF " + allDF.count(),npe) 
      emptyMap 
     } 
     case (ex: Exception) => { 
      logger.error("Generic exception " + ipAddress,ex) 
      emptyMap 
     } 
     } 
    } 

    def nullCheck(input: String): String = { 
    if(input != null) input 
    else "" 
    } 

    override def uaParser(ua: String): Map[String, String] = { 
    val client = Parser.get.parse(ua) 
    return Map(
     "os"->client.os.family, 
     "device"->client.device.family, 
     "browser"->client.userAgent.family) 
    } 

    def ipToLong(ip: String): Option[Long] = { 
    Try(ip.split('.').ensuring(_.length == 4) 
     .map(_.toLong).ensuring(_.forall(x => x >= 0 && x < 256)) 
     .zip(Array(256L * 256L * 256L, 256L * 256L, 256L, 1L)) 
     .map { case (x, y) => x * y } 
     .sum).toOption 
    } 
} 

getGeoLocation이 (NPE로 실행 중) emptyMap을 리턴하는 동안 uaParser가 정상적으로 작동하는 것을 확인할 수 있습니다. main 메소드에서 이것을 어떻게 사용하는지 보여주는 스 니펫 추가하기.

val appName = "SampleApp" 
    val conf: SparkConf = new SparkConf().setAppName(appName) 
    val sc: SparkContext = new SparkContext(conf) 
    val spark = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate() 

    val geouad = new GeoUAData(appName, sc, conf, args(1)) 

    val uaParser = Sparkudf(geouad.uaParser(_: String)) 
    val geolocation = Sparkudf(geouad.getGeoLocation(_: String)) 

    val sampleRdd = sc.textFile(args(0)) 
    val json = sampleRdd.filter(_.nonEmpty) 

    import spark.implicits._ 


    val sampleDF = spark.read.json(json) 

    val columns = sampleDF.select($"user-agent", $"source_ip") 
    .withColumn("sourceIp", $"source_ip") 
    .withColumn("geolocation", geolocation($"source_ip")) 

    .withColumn("uaParsed", uaParser($"user-agent")) 
    .withColumn("device", ($"uaParsed") ("device")) 
    .withColumn("os", ($"uaParsed") ("os")) 
    .withColumn("browser", ($"uaParsed") ("browser")) 
    .withColumn("country" , ($"geolocation")("country")) 
    .withColumn("state" , ($"geolocation")("state")) 
    .withColumn("city" , ($"geolocation")("city")) 
    .withColumn("zipCode" , ($"geolocation")("zipCode")) 
    .withColumn("latitude" , ($"geolocation")("latitude")) 
    .withColumn("longitude" , ($"geolocation")("longitude")) 
    .drop("geolocation") 
     .drop("uaParsed") 

질문 : 1. 우리는 UDF를 정의 이의를 클래스로 전환해야 하는가? (나는 그것을 싱글 톤으로 유지할 수있다.) 2. 클래스 멤버 함수를 UDF로 사용할 수 있습니까? 3. 이러한 UDF가 호출되면 allDF와 같은 클래스 멤버가 초기화 된 상태로 유지됩니까? 4. Val은 멤버 변수로 선언됩니다 - geouad 구축시 초기화 될까요?

저는 스칼라를 처음 사용하기 때문에 미리 지침/제안을 해주셔서 감사드립니다. UDF 호출 동안

답변

0
  1. 아니요 class에서 object로 스위칭 UDF를 형성하기 위해 필요한 것이 아니라, 단지 다르다.

  2. 예, 클래스 멤버 함수를 UDF으로 사용할 수 있지만 먼저이 함수를 UDF으로 등록해야합니다. 하나 UDF

  3. 예, 클래스 변수 발은 초기화됩니다 호출 할 때

    spark.sqlContext.udf.register ("registeredName", 클래스 방법 _)는

  4. 아니, 다른 방법은 초기화된다 geouad에 전화하여 몇 가지 작업을 수행하십시오.

+0

저는 # 2 - spark.sqlContext.udf.register ("geolocation", geouad.geolocation _)를 시도했습니다. 나는 널 포인터 예외를보고있다. @ logger.error ("NPE for"+ ipAddress + "allDF"+ allDF.count(), npe – Guruprasad