1

如何在PySpark中設置流式傳輸DataFrame的模式。使用套接字進行火花傳輸,設置SCHEMA,在控制檯中顯示DATAFRAME

from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode 
from pyspark.sql.functions import split 
# Import data types 
from pyspark.sql.types import * 

spark = SparkSession\ 
    .builder\ 
    .appName("StructuredNetworkWordCount")\ 
    .getOrCreate() 

# Create DataFrame representing the stream of input lines from connection to localhost:5560 
lines = spark\ 
    .readStream\ 
    .format('socket')\ 
    .option('host', '192.168.0.113')\ 
    .option('port', 5560)\ 
    .load() 

比如我需要一個像一個表:

Name, lastName, PhoneNumber  
Bob, Dylan, 123456  
Jack, Ma, 789456 
.... 

如何設置頁眉/模式爲[ '姓名', 'lastName的', '******中國'] 與它們的數據類型。

此外,是否有可能持續顯示此表,或說是DataFrame的前20行。當我嘗試了,我得到了錯誤

「pyspark.sql.utils.AnalysisException:不支持「完全輸出模式時,有上的流DataFrames /數據集;; \ nProject沒有流聚合」

回答

4

TextSocketSource不提供任何集成解析選項。只有可以使用兩種格式之一:

  • 時間戳和文本,如果includeTimestamp設置爲true下面的模式:

    StructType([ 
        StructField("value", StringType()), 
        StructField("timestamp", TimestampType()) 
    ]) 
    
  • 文本只有includeTimestamp設置爲false與如下所示的模式:

    StructType([StructField("value", StringType())])) 
    

如果要更改此格式,您必須將流轉換爲提取感興趣區域,例如使用正則表達式:

from pyspark.sql.functions import regexp_extract 
from functools import partial 

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$" 
) 

lines.select(
    fields(idx=1).alias("name"), 
    fields(idx=2).alias("last_name"), 
    fields(idx=3).alias("phone_number") 
) 
相關問題