2017-12-11 8 views
0

Maxmind snowplow library을 사용하여 데이터 프레임에있는 각 IP의 지오 데이터를 추출하려고합니다.Maxmind Geo 데이터가 포함 된 Spark UDF

우리는 스파크 SQL (스파크 버전 2.1.0)를 사용하고 난 다음 클래스에서 UDF를 생성 :

class UdfDefinitions @Inject() extends Serializable with StrictLogging { 

sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat") 
val s3Config = configuration.databases.dataWarehouse.s3 
val lruCacheConst = 20000 
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName)), 
    ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst) 

def lookupIP(ip: String): LookupIPResult = { 
    val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1 
    loc match { 
    case None => LookupIPResult("", "", "") 
    case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""), 
    x.city.getOrElse(""), x.regionName.getOrElse("")) 
    } 
} 

val lookupIPUDF: UserDefinedFunction = udf(lookupIP _) 

} 

의도는 파일에 대한 포인터 UDF 외부 (ipLookups)을 생성하는 것입니다 내부에서 사용하므로 각 행에 파일을 열지 마십시오. 이것은 일련 번호가 부여되지 않은 작업의 오류를 가져오고 UDF에서 addFiles를 사용하면 너무 큰 파일 열기 오류가 발생합니다 (큰 데이터 세트를 사용할 때 작은 데이터 세트에서 작동 함).

이 스레드는 RDD를 사용하여 문제를 해결하는 방법을 보여 주지만 Spark SQL을 사용하고 싶습니다. using maxmind geoip in spark serialized

의견이 있으십니까? 감사합니다.

답변

0

여기서 문제는 IpLookups가 Serializable이 아니라는 것입니다. 그러나 정적 파일 (frmo 내가 수집 한)에서 조회를하므로 수정할 수 있어야합니다. 나는 repo를 복제하고 IpLookups Serializable을 만들 것을 권할 것이다. 그런 다음 spark SQL로 작업하게하려면, 여러분이했던 것처럼 클래스의 모든 것을 감 쌉니다. 당신은 드라이버에서 모든 것을 할 수 있습니다 :

val IPResolver = new MySerializableIpResolver() 
val resolveIP = udf((ip : String) => IPResolver.resolve(ip)) 
data.withColumn("Result", resolveIP($"IP")) 

을 당신이 많은 고유 한 IP 주소가 다른 해결책이 있다고이없는 경우 다음과 같이 주요 스파크 작업에서, 당신은 뭔가를 쓸 수 있습니다.

val ipMap = data.select("IP").distinct.collect 
    .map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/) 
    .toMap 
val resolveIP = udf((ip : String) => ipMap(ip)) 
data.withColumn("Result", resolveIP($"IP"))