我在Pandas中創建了一些我試圖轉換爲PySpark代碼的代碼。它使用urlparse
Python庫將通用URI解析爲Python字典,將這些鍵轉換爲新列,然後將這些新列與原始數據連接起來。下面是一個簡化的例子。在真實數據集中有38列,我關心保留所有這些列。將Python字典轉換爲PySpark中的稀疏RDD或DF
# create some sample data
df_ex = pd.DataFrame([[102,'text1',u'/some/website/page.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&jsl=161&ln=en&pc=men&dp=www.mysite.com&qfq=news/this-is-an-article&of=2&uf=1&pd=0&irt=0&md=0&ct=1&tct=0&abt=0<=792&cdn=1&lnlc=gb&tl=c=141,m=433,i=476,xm=1243,xp=1254&pi=2&&rb=0&gen=100&callback=_ate.track.hsr&mk=some,key,words,about,the,article&'],
[781,'text2',u'/libtrc/hearst-network/loader.js'],
[9001,'text3',u'/image/view/-/36996720/highRes/2/-/maxh/150/maxw/150/mypic.jpg'],
[121,'text4',u'/website/page2.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&qqd=1&pd=0&irt=0&md=0&zzct=1&tct=0&abt=0<=792&cdn=0&lnlc=gb&tl=c=414,m=32,i=41,xm=1000,xp=111&callback=_ate.track.hsr&mk=some,other,key,words,about,the,article&'],
[781,'text5',u'/libtrc/hearst-network/loader.js']],columns=['num','text','uri'])
# parse the URI to a dict using urlparse
df_ex['uri_dict'] = df_ex['uri'].apply(lambda x: dict(urlparse.parse_qsl(urlparse.urlsplit(x).query)))
# convert the parsed dict to a series
df_ex_uridict_series = df_ex['uri_dict'].apply(pd.Series)
# concatenate the parsed dict (now columns) back with original DF
df_final = pd.concat([df_ex, df_ex_uridict_series], axis=1).drop('uri_dict', axis=1)
的東西,看起來像這樣(裁剪)得到的:
的結果是相當稀疏,但是這很好。對於應用程序,我實際上更喜歡它是一個稀疏矩陣(儘管我可以確信是否有一個很好的選擇,密集的方法)。這就是我試圖在PySpark中重新創建的結果。
到目前爲止我所在的地方(在PySpark 2.1.0中)(使用相同的數據)。
# urlparse library
import urlparse
# create the sample data as RDD
data = sc.parallelize([[102,'text1',u'/some/website/page.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&jsl=161&ln=en&pc=men&dp=www.mysite.com&qfq=news/this-is-an-article&of=2&uf=1&pd=0&irt=0&md=0&ct=1&tct=0&abt=0<=792&cdn=1&lnlc=gb&tl=c=141,m=433,i=476,xm=1243,xp=1254&pi=2&&rb=0&gen=100&callback=_ate.track.hsr&mk=some,key,words,about,the,article&'],[781,'text2',u'/libtrc/hearst-network/loader.js'],[9001,'text3',u'/image/view/-/36996720/highRes/2/-/maxh/150/maxw/150/mypic.jpg'],[121,'text4',u'/website/page2.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&qqd=1&pd=0&irt=0&md=0&zzct=1&tct=0&abt=0<=792&cdn=0&lnlc=gb&tl=c=414,m=32,i=41,xm=1000,xp=111&callback=_ate.track.hsr&mk=some,other,key,words,about,the,article&'],[781,'text5',u'/libtrc/hearst-network/loader.js']])
# simple map to parse the uri
uri_parsed = data.map(list).map(lambda x: [x[0],x[1],urlparse.parse_qs(urlparse.urlsplit(x[2]).query)])
這讓我非常接近,在RDD的每個「行」內嵌入了一個python字典。像這樣:
In [187]: uri_parsed.take(3)
Out[187]:
[[102,
'text1',
{u'abt': [u'0'],
u'callback': [u'_ate.track.hsr'],
u'cdn': [u'1'],
u'colc': [u'1452802104103'],
u'ct': [u'1'],
u'dp': [u'www.mysite.com'],
u'gen': [u'100'],
u'irt': [u'0'],
u'jsl': [u'161'],
u'ln': [u'en'],
u'lnlc': [u'gb'],
u'lt': [u'792'],
u'md': [u'0'],
u'mk': [u'some,key,words,about,the,article'],
u'of': [u'2'],
u'pc': [u'men'],
u'pd': [u'0'],
u'pi': [u'2'],
u'qfq': [u'news/this-is-an-article'],
u'rb': [u'0'],
u'rev': [u'v4.1.2-wp'],
u'si': [u'569800363b029b74'],
u'tct': [u'0'],
u'tl': [u'c=141,m=433,i=476,xm=1243,xp=1254'],
u'uf': [u'1']}],
[781, 'text2', {}],
[9001, 'text3', {}]]
這些值包含列表,但沒關係。他們可以留在列表中。
我現在想要做的是從字典中解析出鍵/值對(如Pandas中),從鍵創建新列,然後將值(或值列表)案件)在RDD中。
有些事情我已經試過:
- 走向全面的PySpark DF:寫了一個UDF,並使用
with_column
建立在DF的新列應用。這是有效的,但它將整個詞典作爲一個單獨的字符串(沒有鍵和值在引號中)。我沒有試圖推動這一點,並添加引號(認爲有更好的方法)。 - 拆分原始DF:首先使用
monotonically_increasing_id()
爲每個DF行分配一個唯一ID,拆分兩列(新ID和URI),將拆分轉換爲RDD,然後解析。這會讓我再回來(使用ID),但它並沒有幫助創建我想要的「稀疏矩陣」。
我還發現這些技術(使用Spark v2.1.0和Hive數據存儲)可能不是用於表示此類數據的正確底層技術。也許一個無模式的數據存儲會更好。但是,我現在限制使用Spark和Hive作爲數據存儲。
任何幫助將不勝感激!