2016-06-11 4 views
3

Spark를 사용하여 Postgres 데이터베이스로 텍스트 파일을 내보내려고합니다. 개별 텍스트 파일을 내보내려면 아래 코드를 사용하고 있습니다. 나는 같은 폴더에 200 개의 텍스트 파일을 가지고 있고 모든 텍스트 파일은 같은 구조를 가지고있다. 불행히도 올해 값은 필자의 입력 파일의 일부가 아니기 때문에 코딩하기가 어렵습니다.Spark - Automation을 사용하여 PostgreSQL로 텍스트 파일 내보내기

한 번에 모든 파일을 업로드하고 싶지만 수행 방법을 모르겠습니까? 누구든지 제안 사항을 알려주십시오.

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

lines = sc.textFile("/aaaa/bbbb/DataFile/t-1870.txt") 
splits = lines.map(lambda l: l.split(",")) 
raw_data = splits.map(lambda b: Row(name=b[0], gender=b[1],count=int(b[2]),year=int(1870))) 

schemaBabies = sqlContext.createDataFrame(raw_data) 
schemaBabies.registerTempTable("raw_data") 

df = sqlContext.sql("select * from raw_data") 

pgurl="jdbc:postgresql://localhost:5432/sparkling?user=XXXX&password=XXXX" 
properties={"user":"XXXX","password":"XXXX","driver":"org.postgresql.Driver","mode":"append"} 

df.write.jdbc(url = pgurl ,table = "EDW.raw_data",properties=properties) 

답변

2

는 데이터가 다음과 같습니다 가정하자 :

from pyspark.sql.types import * 

schema = StructType([ 
    StructField("name", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("count", LongType(), True) 
]) 

df = (sqlContext.read.format("com.databricks.spark.csv") 
    .schema(schema) 
    .load(out)) 

추출 년도 :

import csv 
import tempfile 
import os 

out = tempfile.mkdtemp() 
data = [ 
    ("1870", [("Jane Doe", "F", 3)]), 
    ("1890", [("John Doe", "M", 1)]), 
] 

for year, rows in data: 
    with open(os.path.join(out, "t-{0}.txt".format(year)), "w") as fw: 
     csv.writer(fw).writerows(rows) 

시작 PySpark 세션이나 스크립트가 지정된 스키마와 올바른 spark-csv--packages에 인수 및로드 데이터를 전달 제출 파일 이름에서 다음을 작성하십시오.

from pyspark.sql.functions import input_file_name, regexp_extract 

df_with_year = (df.withColumn(
    "year", 
    regexp_extract(input_file_name(), "[1-2][0-9]{3}", 0).cast("int"))) 

df_with_year.show() 
## +--------+------+-----+----+ 
## | name|gender|count|year| 
## +--------+------+-----+----+ 
## |John Doe|  M| 1|1890| 
## |Jane Doe|  F| 3|1870| 
## +--------+------+-----+----+ 

df_with_year.write.jdbc(...) 

중요 : Spark < 2.0에서이 접근 방식은 파이썬과 JVM간에 데이터를 전달하지 않는 것에 달려 있습니다. 을 Python UDF 또는 DataFrame.rdd.map과 함께 사용할 수 없습니다.

+1

입력 내용을 기반으로 코드를 일부 변경했으며 데이터베이스에 200 개 이상의 텍스트 파일을 모두로드 할 수있었습니다. 정말 도움을 주셨습니다. – ytasfeb15