Pempark를 사용하여 KMeans 알고리즘을 구현하려고하는데 이는 while 루프의 마지막 행에서 위의 오류를 발생시킵니다. 루프 외부에서 잘 작동하지만 루프를 생성 한 후에이 오류가 발생했습니다 어떻게 수정합니까? 당신이하지 않는 len
객체 map
의 (발전기 유형) 얻으려고 노력하고 있기 때문에TypeError : 'map'유형의 객체에 len()이 없습니다. Python3
# Find K Means of Loudacre device status locations
#
# Input data: file(s) with device status data (delimited by '|')
# including latitude (13th field) and longitude (14th field) of device locations
# (lat,lon of 0,0 indicates unknown location)
# NOTE: Copy to pyspark using %paste
# for a point p and an array of points, return the index in the array of the point closest to p
def closestPoint(p, points):
bestIndex = 0
closest = float("+inf")
# for each point in the array, calculate the distance to the test point, then return
# the index of the array point with the smallest distance
for i in range(len(points)):
dist = distanceSquared(p,points[i])
if dist < closest:
closest = dist
bestIndex = i
return bestIndex
# The squared distances between two points
def distanceSquared(p1,p2):
return (p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2
# The sum of two points
def addPoints(p1,p2):
return [p1[0] + p2[0], p1[1] + p2[1]]
# The files with device status data
filename = "/loudacre/devicestatus_etl/*"
# K is the number of means (center points of clusters) to find
K = 5
# ConvergeDist -- the threshold "distance" between iterations at which we decide we are done
convergeDist=.1
# Parse device status records into [latitude,longitude]
rdd2=rdd1.map(lambda line:(float((line.split(",")[3])),float((line.split(",")[4]))))
# Filter out records where lat/long is unavailable -- ie: 0/0 points
# TODO
filterd=rdd2.filter(lambda x:x!=(0,0))
# start with K randomly selected points from the dataset
# TODO
sample=filterd.takeSample(False,K,42)
# loop until the total distance between one iteration's points and the next is less than the convergence distance specified
tempDist =float("+inf")
while tempDist > convergeDist:
# for each point, find the index of the closest kpoint. map to (index, (point,1))
# TODO
indexed =filterd.map(lambda (x1,x2):(closestPoint((x1,x2),sample),((x1,x2),1)))
# For each key (k-point index), reduce by adding the coordinates and number of points
reduced=indexed.reduceByKey(lambda x,y: ((x[0][0]+y[0][0],x[0][1]+y[0][1]),x[1]+y[1]))
# For each key (k-point index), find a new point by calculating the average of each closest point
# TODO
newCenters=reduced.mapValues(lambda x1: [x1[0][0]/x1[1], x1[0][1]/x1[1]]).sortByKey()
# calculate the total of the distance between the current points and new points
newSample=newCenters.collect() #new centers as a list
samples=zip(newSample,sample) #sample=> old centers
samples1=sc.parallelize(samples)
totalDistance=samples1.map(lambda x:distanceSquared(x[0][1],x[1]))
# Copy the new points to the kPoints array for the next iteration
tempDist=totalDistance.sum()
sample=map(lambda x:x[1],samples) #new sample for next iteration as list
sample
오류 메시지가 나에게 꽤 명시 적으로 명확 보인다 -' map'은 파이썬 2와 같은리스트가 아니라 생성기를 반환합니다. – miradulo
strack 추적을 게시하십시오. 어떤 줄에 문제가 있는지 언급하지 않고 100 줄의 코드를 게시했습니다. – tdelaney
관련 (가능한 중복) : http://stackoverflow.com/a/12319034/748858 – mgilson