2017-12-27 36 views
2

최근에 Flink 1.3.2에서 1.4.0으로 업그레이드를 시도했으며 더 이상 org.apache.hadoop.fs.{FileSystem, Path}을 가져올 수없는 몇 가지 문제가 있습니다. 문제는 두 곳에서 발생 :Flink 1.3.2에서 1.4.0으로 업그레이드하기 hadoop FileSystem 및 경로 문제

ParquetWriter :

import org.apache.avro.Schema 
import org.apache.avro.generic.GenericRecord 
import org.apache.hadoop.fs.{FileSystem, Path} 
import org.apache.flink.streaming.connectors.fs.Writer 
import org.apache.parquet.avro.AvroParquetWriter 
import org.apache.parquet.hadoop.ParquetWriter 
import org.apache.parquet.hadoop.metadata.CompressionCodecName 

class AvroWriter[T <: GenericRecord]() extends Writer[T] { 

    @transient private var writer: ParquetWriter[T] = _ 
    @transient private var schema: Schema = _ 

    override def write(element: T): Unit = { 
    schema = element.getSchema 
    writer.write(element) 
    } 

    override def duplicate(): AvroWriter[T] = new AvroWriter[T]() 

    override def close(): Unit = writer.close() 

    override def getPos: Long = writer.getDataSize 

    override def flush(): Long = writer.getDataSize 

    override def open(fs: FileSystem, path: Path): Unit = { 
    writer = AvroParquetWriter.builder[T](path) 
     .withSchema(schema) 
     .withCompressionCodec(CompressionCodecName.SNAPPY) 
     .build() 
    } 

} 

CustomBucketer :

import org.apache.flink.core.fs.{FileSystem, Path} 

그러나 새로운 Path하지 않습니다 : 나는 FLINK 지금 가지고 것으로 나타났습니다

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer 
import org.apache.flink.streaming.connectors.fs.Clock 
import org.apache.hadoop.fs.{FileSystem, Path} 
import java.io.ObjectInputStream 
import java.text.SimpleDateFormat 
import java.util.Date 

import org.apache.avro.generic.GenericRecord 

import scala.reflect.ClassTag 

class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] { 

    @transient var dateFormatter: SimpleDateFormat = _ 

    private def readObject(in: ObjectInputStream): Unit = { 
    in.defaultReadObject() 
    if (dateField != null && dateFieldFormat != null) { 
     dateFormatter = new SimpleDateFormat(dateFieldFormat) 
    } 
    } 

    override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = { 
    val partitions = bucketOrder.map(field => { 
     if (field == dateField) { 
     field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long])) 
     } else { 
     field + "=" + element.get(field) 
     } 
    }).mkString("/") 
    new Path(basePath + "/" + partitions) 
    } 

} 

AvroParquetWriter 또는으로 작동하는 것으로 보입니다. 0 방법. Flink의 FileSystem 및 Hadoop 종속성에 대한 몇 가지 변경 사항이 있으며, 코드를 다시 가져 오려면 무엇을 가져올 지 확신 할 수 없습니다.

Hadoop 의존성을 사용해야 할 필요가 있습니까, 아니면 Parquet 파일을 s3에 쓰거나 버켓팅하는 다른 방법이 있습니까?

build.sbt :는 "하둡 - 자유 FLINK"을 구축

val flinkVersion = "1.4.0" 

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, 
    "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-core" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, 
    "org.apache.kafka" %% "kafka" % "0.10.0.1", 
    "org.apache.avro" % "avro" % "1.7.7", 
    "org.apache.parquet" % "parquet-hadoop" % "1.8.1", 
    "org.apache.parquet" % "parquet-avro" % "1.8.1", 
    "io.confluent" % "kafka-avro-serializer" % "3.2.2", 
    "com.fasterxml.jackson.core" % "jackson-core" % "2.9.2" 
) 

답변

0

org.apache.hadoop.fs.{FileSystem, Path} 클래스는 hadoop-commons 프로젝트에서 찾을 수 있습니다.

1

는 1.4 릴리스 중 하나 개 큰 특징이었다. 당신이해야 할 모든 클래스 패스에 하둡 종속성을 포함하거나 인용하는 것입니다 changelogs :

...이 또한 경우에 당신이 같은 BucketingSink 또는 RollingSink, 당신으로, HDFS에 커넥터를 사용하는 곳을 의미합니다 이제는 번들 된 Hadoop 의존성이있는 Flink 배포판을 사용하거나 응용 프로그램 용 jar 파일을 빌드 할 때 Hadoop 종속성을 포함시켜야합니다.

+0

내가 포함해야하는 종속성을 추적하려고 노력할 것입니다. 또한 내가 s3에 쪽 마루를 씀을하기 위해 의존성을 포함 할 필요가 있거나 심지어 플린 크 1.4에서 다른 일을하는 다른 방법이 있다면 궁금 할 것이다. – moku