2017-07-16 98 views
1

我想編寫一個Spark Streamin應用程序,該應用程序以隨機​​整數形式對數據流進行計數。這裏是星火應用程序,我寫道:Spark Streaming應用程序無法接收端口上的字符串

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

sc = SparkContext("local[2]", "IntegerCount") # 2 threads, app name 
ssc = StreamingContext(sc, 1) # sc, time interval for batch update. 

nums = ssc.socketTextStream("localhost", 8000) # stream data from TCP; source, port 

# create key,value pairs 
tests = nums.map(lambda num: (int(num), 1)) 

# Count each integer in each batch 
intCounts = tests.reduceByKey(lambda x, y: x + y) 

# Print 
intCounts.pprint() 

ssc.start()    # Start the computation 
ssc.awaitTermination() # Wait for the computation to terminate 

而且我服隨機數的8000端口與Server.py:

import socket 
from random import randint 

host = 'localhost' 
port = 8000 
address = (host, port) 

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
server_socket.bind(address) 
server_socket.listen(5) 


print "Listening for client . . ." 
conn, address = server_socket.accept() 
print "Connected to client at ", address 
#pick a large output buffer size because i dont necessarily know how big the incoming packet is 
while True: 
    output = str(randint(0, 10)) 
    conn.send(output) 

當我運行Server.py和我的星火應用程序,連接成功建立。不過我看到一個空輸出就是這樣的:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
Setting default log level to "WARN". 
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 
------------------------------------------- 
Time: 2017-07-16 22:36:11 
------------------------------------------- 

------------------------------------------- 
Time: 2017-07-16 22:36:12 
------------------------------------------- 

我不知道是什麼問題,請幫我理解一下是怎麼回事?

+0

沒有人知道問題??? – fcgtyg

回答

0

解決了,我發送字符串與「\ n」,它的工作。

import socket 
from random import randint 

host = 'localhost' 
port = 8000 
address = (host, port) 

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
server_socket.bind(address) 
server_socket.listen(5) 


print "Listening for client . . ." 
conn, address = server_socket.accept() 
print "Connected to client at ", address 
#pick a large output buffer size because i dont necessarily know how big the incoming packet is 
while True: 
    output = str(randint(0, 10)) + "\n" ### THAT IS THE FIX. 
    conn.send(output) 
相關問題