Maxmind snowplow library을 사용하여 데이터 프레임에있는 각 IP의 지오 데이터를 추출하려고합니다.Maxmind Geo 데이터가 포함 된 Spark UDF
우리는 스파크 SQL (스파크 버전 2.1.0)를 사용하고 난 다음 클래스에서 UDF를 생성 :
이class UdfDefinitions @Inject() extends Serializable with StrictLogging {
sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
val s3Config = configuration.databases.dataWarehouse.s3
val lruCacheConst = 20000
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName)),
ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)
def lookupIP(ip: String): LookupIPResult = {
val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
loc match {
case None => LookupIPResult("", "", "")
case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""),
x.city.getOrElse(""), x.regionName.getOrElse(""))
}
}
val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)
}
의도는 파일에 대한 포인터 UDF 외부 (ipLookups)을 생성하는 것입니다 내부에서 사용하므로 각 행에 파일을 열지 마십시오. 이것은 일련 번호가 부여되지 않은 작업의 오류를 가져오고 UDF에서 addFiles를 사용하면 너무 큰 파일 열기 오류가 발생합니다 (큰 데이터 세트를 사용할 때 작은 데이터 세트에서 작동 함).
이 스레드는 RDD를 사용하여 문제를 해결하는 방법을 보여 주지만 Spark SQL을 사용하고 싶습니다. using maxmind geoip in spark serialized
의견이 있으십니까? 감사합니다.