2017-12-09 9 views
1

데이터를 CSV 파일로 보낼 수 있습니다. 먼저 임의 번호를 CSV 파일에 쓰고 보내십시오. 직접 보낼 수도 있습니까? 내 소켓 코드 :파이썬 - 정수 또는 문자열을 스파크 스트리밍으로 보내기

import socket 
host = 'localhost' 
port = 8080 

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
s.bind((host, port)) 
s.listen(1) 
while True: 
    print('\nListening for a client at',host , port) 
    conn, addr = s.accept() 
    print('\nConnected by', addr) 
    try: 
     print('\nReading file...\n') 
     while 1: 
      out = "test01" 
      print('Sending line', line) 
      conn.send(out) 
    except socket.error: 
     print ('Error Occured.\n\nClient disconnected.\n') 
conn.close() 

스파크 스트리밍 코드 :

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

sc = SparkContext("local[2]","deneme") 
ssc = StreamingContext(sc, 10) 
socket_stream = ssc.socketTextStream("localhost",8080) 

random_integers = socket_stream.window(30) 

digits = random_integers.flatMap(lambda line: line.split(" ")).map(lambda digit: (digit, 1)) 

digit_count = digits.reduceByKey(lambda x,y:x+y) 
digit_count.pprint() 

ssc.start() 

답변

0

이 소켓 블록 데이터를 전송하기 때문이다 결코이 이동합니다. 가장 기본적인 해결책은 일정량의 데이터를 보내고 연결을 끊는 것입니다.

import socket 
import time 

host = 'localhost' 
port = 50007 

i = 0 

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

s.bind((host, port)) 
s.listen(1) 

try: 
    while True: 
     conn, addr = s.accept() 
     try: 
      for j in range(10): 
       conn.send(bytes("{}\n".format(i), "utf-8")) 
       i += 1 
       time.sleep(1) 
      conn.close() 
     except socket.error: pass 
finally: 
    s.close() 

무언가 더 흥미로운 것을 얻으려면 제한 시간과 함께 비 차단 모드를 확인하십시오.

+0

더 빠른 방법이 있습니까? 연결을 열거 나 닫을 때마다 많은 시간이 낭비되기 때문입니다. –