0
Spark 2.1.0을 사용 중이고 Cassandra 클러스터를 연결하려고합니다. 나는 최신 스파크 릴을 사용했다. 나는 기본 아래와 같이 기본 구성을 설정 한 :spark 세션을 사용하여 Cassandra 테이블을로드 할 수 없습니다. sparklyr 및 R
# local-only configuration
sparklyr.cores.local: !expr parallel::detectCores()
spark.sql.shuffle.partitions.local: !expr parallel::detectCores()
# cassandra settings
spark.cassandra.connection.host:<Cassandra IP>
spark.cassandra.auth.username: <uid>
spark.cassandra.auth.password:<pass>
sparklyr.defaultPackages:
- com.databricks:spark-csv_2.11:1.5.0
- com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-RC1
- com.datastax.cassandra:cassandra-driver-core:3.1.4
항아리는 소스 파일이있는 루트 디렉토리에 있습니다.
다음 작업을 수행했습니다. read 함수를 호출하려고 할 때까지 모든 것이 잘되었다. 항아리 위치를 명시 적으로 설정했습니다.
> library(sparklyr)
> config <- spark_config()
Warning message:
In readLines(input, encoding = "UTF-8") :
incomplete final line found on '/home/bsc/BSCAnalytics/config.yml'
> config[["sparklyr.jars.default"]] <- c("/home/bsc/BSCAnalytics/cassandra-driver-core-3.1.4.jar")
>
> sc <- spark_connect(master = "local", version = "2.1.0")
Warning message:
In readLines(input, encoding = "UTF-8") :
incomplete final line found on '/home/bsc/BSCAnalytics/config.yml'
> Spark.session <- sparklyr::invoke_static(sc, "org.apache.spark.sql.SparkSession", "builder") %>% sparklyr::invoke("config", "spark.cassandra.connection.host", "<Cassandra IP>") %>% sparklyr::invoke("getOrCreate")
읽기 함수를 호출하려고하면 런타임이 항아리를 찾을 수 없습니다.
이> event_df <- invoke(Spark.session, "read") %>% invoke("format", "org.apache.spark.sql.cassandra") %>% invoke("option", "keyspace", "kps") %>% invoke("option", "table", "tab_event") %>% invoke("load")
Error: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:569)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke$.invoke(invoke.scala:94)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
at sparklyr.StreamHandler$.read(stream.scala:55)
at sparklyr.BackendHandler.channelRead0(handler.scala:49)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:554)
... 39 more
안녕 하비에르, 감사 참조 나는 다음과 같은 오류를 목격. 나는 당신의 열람에 대한 세부 사항을 다음과 공유하고 추천을 추구하고 싶습니다 : 우리의 스파크 (2.1) 클러스터는 카산드라가 사용자 ID 및 암호로 보호되어 이 나를 주시기 바랍니다 카산드라 (3.x의) 노드에 위치한 공동되지 않습니다 다음을 숙지하십시오. Spark Cassandra 커넥터 단지는 어디에 배치해야하며 Sparlyr은이를 어떻게 나타낼 것입니까? 그것은 당신이 config 매개 변수를 설정하는 함수에 maven 속성을 전달하는 것 같습니다 우리는 어디에서 클러스터 IP, 사용자 ID 및 암호를 전달합니까? 추가 분석을 위해 냉동 또는 기타 Cassandra 복합 물체를 DF에로드하는 방법은 무엇입니까? – SCB