아무도이 PySpark 문제를 도와주지 않겠습니까? 나는 이것에 일을 보냈다. 그냥 여러 번 인쇄 할 때 왜 내 스키마의 길이가 바뀌는 지 알 수 없습니다. Spark 버전은 2.2이고 Jupyter Notebook을 사용하여 20 노드의 클러스터에서 코드를 실행합니다. 여기 플랫 맵 이후에 스키마 길이가 변경되고 pyspark에 여러 번 인쇄 될 때
import myReader
# read data from binary files
data=sc.binaryFiles('Data/20171006')
# binary file reader convert binary file to a tuple of schema and data array
# the 1st item in the tuple is the schema of type StructType
# the 2nd item in the tuple is a numpy 2D array
tableWithSchemaRDD = data.map(myReader.convert)
# print out the length of the schema to check its length
# since the schema is the same for all items in the RDD, I only check the first one
print "1st print: ", len(tableWithSchemaRDD.first()[0])
# extract the data array from RDD
tableRDD = tableWithSchemaRDD.map(lambda x:x[1])
# print out the length of the schema to check its length again
print "2nd print: ", len(tableWithSchemaRDD.first()[0])
# flatmap so each item in the rdd is a row instead of 2D array
# and sort all the rows by the last item in each row, which is a timestamp
rowRDD = tableRDD.flatMap(lambda y:[z for z in y]).sortBy(lambda x:x[-1])
# print out the length of the schema multiple times
print "multiple print: "
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
이 출력됩니다 : 여기
내 코드 당신이 처음 2
print
는 RDD의 정확한 길이를 인쇄 볼 수 있듯이
1st print: 73
2nd print: 73
multiple print:
3961
3961
3961
3961
73
73
하지만 이후 flatMap
변환, 훨씬 더 큰 숫자로 증가했다. 그런 다음 잠시 후 (예 : 4 print
) 올바른 길이로 되돌아 왔습니다. 때로는 잘못된 숫자가 3961이 아니라 72의 배수 중 다른 숫자에 1을 더한 것입니다. 내 스키마의 첫 번째 72 StructField는 데이터의 이름이므로 73 번째 StructField는 타임 스탬프이므로 숫자 3961은 72 * 51 + 1입니다. 또한 11737, 23401, 9793, 2017 등의 숫자를 보았습니다.
코드에서 설명하는 코드는 myReader
입니다. 그것을 모듈로 가져 오기. 나는이 문제를 보지 못했다. 내 모듈을 노드에 배포하는 데 sc.addPyFile()
을 사용했습니다.
모든 의견 및 제안을 부탁드립니다. 고맙습니다!
답변 해 주셔서 감사합니다. 왜 당신은 거기에 유대 관계가 있다고 생각하는지 물어봐도 될까요? 모든 타임 스탬프는 고유하며'sortBy'에 사용됩니다. – dangwh