2017-02-10 68 views
1

我正在嘗試使用三個表的連接在SPARK SQL中編寫查詢。但是查詢輸出爲空。它對單個表格工作正常。我的Join查詢是正確的,因爲我已經在oracle數據庫中執行了它。我需要在這裏修改什麼修正?星火版本是2.0.0在Spark中加入超過2個表SQL

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

lines = sc.textFile("/Users/Hadoop_IPFile/purchase") 
lines2 = sc.textFile("/Users/Hadoop_IPFile/customer") 
lines3 = sc.textFile("/Users/Hadoop_IPFile/book") 

parts = lines.map(lambda l: l.split("\t")) 
purchase = parts.map(lambda p: Row(year=p[0],cid=p[1],isbn=p[2],seller=p[3],price=int(p[4]))) 
schemapurchase = sqlContext.createDataFrame(purchase) 
schemapurchase.registerTempTable("purchase") 


parts2 = lines.map(lambda l: l.split("\t")) 
customer = parts2.map(lambda p: Row(cid=p[0],name=p[1],age=p[2],city=p[3],sex=p[4])) 
schemacustomer = sqlContext.createDataFrame(customer) 
schemacustomer.registerTempTable("customer") 

parts3 = lines.map(lambda l: l.split("\t")) 
book = parts3.map(lambda p: Row(isbn=p[0],name=p[1])) 
schemabook = sqlContext.createDataFrame(book) 
schemabook.registerTempTable("book") 

result_purchase = sqlContext.sql("""SELECT DISTINCT customer.name AS name FROM purchase JOIN book ON purchase.isbn = book.isbn JOIN customer ON customer.cid = purchase.cid WHERE customer.name != 'Harry Smith' AND purchase.isbn IN (SELECT purchase.isbn FROM customer JOIN purchase ON customer.cid = purchase.cid WHERE customer.name = 'Harry Smith')""") 

result = result_purchase.rdd.map(lambda p: "name: " + p.name).collect() 
for name in result: 
    print(name) 


DataSet 
--------- 
Purchase 
1999 C1 B1 Amazon 90 
2001 C1 B2 Amazon 20 
2008 C2 B2 Barnes Noble 30 
2008 C3 B3 Amazon 28 
2009 C2 B1 Borders 90 
2010 C4 B3 Barnes Noble 26 


Customer 
C1 Jackie Chan 50 Dayton M 
C2 Harry Smith 30 Beavercreek M 
C3 Ellen Smith 28 Beavercreek F 
C4 John Chan 20 Dayton M 

Book 
B1 Novel 
B2 Drama 
B3 Poem 

我發現下面一些網頁的指令,但它仍然沒有工作:schemapurchase.join(schemabook,schemapurchase.isbn == schemabook.isbn)schemapurchase.join(schemacustomer,schemapurchase .cid == schemacustomer.cid)

+0

你想要的輸出是什麼? – pheeleeppoo

+1

「成龍」是我正在尋找的輸出。 – SPram

回答

2

鑑於此輸入DataFrames就像你的例子(對不起,如果一些列名是錯的,我猜他們):

購買:

+----+---+----+------------+-----+ 
|year|cid|isbn|  shop|price| 
+----+---+----+------------+-----+ 
|1999| C1| B1|  Amazon| 90| 
|2001| C1| B2|  Amazon| 20| 
|2008| C2| B2|Barnes Noble| 30| 
|2008| C3| B3|  Amazon| 28| 
|2009| C2| B1|  Borders| 90| 
|2010| C4| B3|Barnes Noble| 26| 
+----+---+----+------------+-----+ 

客戶:

+---+-----------+---+-----------+-----+ 
|cid|  name|age|  city|genre| 
+---+-----------+---+-----------+-----+ 
| C1|Jackie Chan| 50|  Dayton| M| 
| C2|Harry Smith| 30|Beavercreek| M| 
| C3|Ellen Smith| 28|Beavercreek| F| 
| C4| John Chan| 20|  Dayton| M| 
+---+-----------+---+-----------+-----+ 

書:

+----+-----+ 
|isbn|genre| 
+----+-----+ 
| B1|Novel| 
| B2|Drama| 
| B3| Poem| 
+----+-----+ 

可以使用數據框的功能轉換該SQL查詢,像如下:

val result = purchase.join(book, purchase("isbn")===book("isbn")) 
        .join(customer, customer("cid")===purchase("cid")) 
        .where(customer("name") !== "Harry Smith") 
        .join(temp, purchase("isbn")===temp("purchase_isbn")) 
        .select(customer("name").as("NAME")).distinct() 

其中「TEMP」是結果「SELECT IN」,這可以認爲是另一個連接的結果:

val temp = customer.join(purchase, customer("cid")===purchase("cid")) 
        .where(customer("name")==="Harry Smith") 
        .select(purchase("isbn").as("purchase_isbn"))  


+-------------+ 
|purchase_isbn| 
+-------------+ 
|   B2| 
|   B1| 
+-------------+ 

所以最後的結果是:

+-----------+ 
|  NAME| 
+-----------+ 
|Jackie Chan| 
+-----------+ 

考慮這個答案好像你就可以開始從思考一個點(太多連接可能對性能不利影響,例如)。

+0

感謝您的解決方案。 – SPram