2017-12-12 10 views
-1

이 문제는 Pyspark와 관련이 있습니다. 필자는 컬럼이 거의없는 TSV 파일을 읽었습니다. 특정 열은 주석 열입니다. 내 임무는 언어에 따라 행을 필터링하는 것입니다. 예를 들어, 주석이 러시아어로 된 경우 특정 행을 필터링하여 별도의 파일로 저장하려고합니다.Pyspark - 언어를 기반으로 행 필터링

이제 파일을 읽는 동안 Dataframes를 만드는 코드를 사용하고 있습니다.

Info = sqlContext.read.format("csv"). \ 
option("delimiter","\t"). \ 
option("header", "True"). \ 
option("inferSchema", "True"). \ 
load("file.tsv") 

DataFrame[ID: int Comments: string] 

는 다음 나는 ASCII 값을 기준으로 레코드를 필터링 할 수 ORD 기능을 사용하려고 :

TypeError: argument 2 to map() must support iteration

샘플 : 그러나

Info.filter((map(ord,Info.Comments)) < 128).collect() 

, 나는 오류를 받고 있어요 입력 :

Comments 

{175:'Аксессуары'} 
{156:'Горные'} 
{45:'Кровати, диваны и кресла'} 
{45:'Кровати, диваны и кресла'} 

P 임대는 몇 가지 해결책을 제안합니다. 어떤 도움이나 제안도 환영합니다.

업데이트 :

ags29 @ 나는이 코드를 작성하여 주석에 언급 된 오류를 시정했다.

spark_ord=F.udf(lambda x: [ord(c) for c in x],t.ArrayType(IntegerType())) 
Info=Info.withColumn('russ', spark_ord('Comments')) 

DataFrame[ID: int, Comments: string, russ: array<int>] 

이제는 [Int] 배열을 만드는 것과 같은 문제가 있습니다. 나는 128보다 작아야하는 배열에있는 값을 기반으로 전체 행을 필터링해야합니다.

나는 그것을 성취하려고 애 쓰고 있습니다. 제발 제안 해주세요.

+0

내 대답이 도움이 되었습니까? 질문이있는 경우 알려주십시오 – ags29

+0

이 명령을 실행하는 동안 오류가 발생합니다. UDF가 제대로 작성되지 않았을 것 같습니다. 길이가 1이지만 예상되지 않는 길이의 문자가 발견 되었으나 문자가 입력되지 않았습니다. –

+0

입력 (예 : check 널 (null)이 없음). null을 제거하고 다시 테스트하십시오. 그것이 작동한다면 우리는 udf에서 오류 처리 만하면됩니다. 알려줘. – ags29

답변

0

@ ags29 의견을 보내 주셔서 감사합니다. 우리는 내가 NA로 대체하고,이 경우 일부 값으로 null 값을 교체해야

위에서 언급 한 바와 같이 파일을 읽어 Dataframe을 만든 후 : 여기

는 대답이다.

InfoWoNull = Info.fillna({'Comments':'NA'}) 

그런 다음 ORD 기능을 사용하여 문자열의 각 문자에 대한 ASCII 값을 찾으려면 UDF를 만듭니다. 출력은 정수 배열입니다.

from pyspark.sql import functions as F 
from pyspark.sql import types as t 
from pyspark.sql.types import ArrayType, IntegerType 

russ_ord=F.udf(lambda x: [ord(a) for a in x],t.ArrayType(IntegerType())) 

는 127보다 큰

def russian_filter(x): 
for index in range(len(x)): 
    if x[index] > 127: 
    return True 
return False 

filter_udf = F.udf(russian_filter, BooleanType()) 

아래처럼 마지막 단계에서 사용 ASCII 문자를 기반으로 값을 필터링하는 필터 함수를 만듭니다.

Info_rus = InfoWoNull.filter(filter_udf(russ_ord('SearchParams')) == 'true') 
Info_rus.show() 
0

이 테스트되지 않지만,이 라인을 따라 뭔가 일을해야 :

기본적으로
from pyspark.sql.functions import udf 
from pyspark.sql.types import IntegerType 

# create user defined function from ord 
spark_ord=udf(lambda x: ord(x), IntegerType()) 

Info=Info.withColumn('ord', spark_ord('Comments')) 
Info=Info.filter('ord<128') 

가하는 DataFrame으로 ord 기능을 사용하려면, 당신은 함수를 정의 사용자가 필요합니다. 시도한 방법은 DataFrame이 아닌 RDD가 필요합니다.

+0

이 명령을 실행하는 동안 오류가 발생합니다. UDF가 제대로 작성되지 않았을 것 같습니다. TypeError : length() 예상 길이가 1 인 문자열이지만 길이가 1 인 유형이 발견되었습니다 (예 : –