2015-11-07 87 views
2

對於Spark和SparkR,我相當新,並且可能有一些基本問題。爲什麼Window函數(延遲)在SparkR中不起作用?

本練習的目的是在SparkR中實現窗口函數(lead,lag,rank等)。

我稱爲那裏提到下面的鏈接和Databricks後但沒有利用 -

SparkSQL - Lag function?

代碼段我使用:

初始化sqlContext並註冊該數據幀作爲一個臨時表使用 Registertemptable

output_data<-SparkR::sql(sqlContext, "select *,lag(type) over(partition by key order by key) as lag_type from input_data") 

錯誤我們面臨的是:

failure: ``union'' expected but `(' found 

另一個建議,我發現是使用一個Hivecontext而非SQLcontext作爲SQLcontext可能不允許所有功能。

在該方法中,初始化Hivecontext並試圖運行HiveQL 做同樣給了我們一個錯誤說:

cannot find table named input_table 

問:做我們需要運行類似registertemptable所以一些命令至於允許Hivecontext訪問表?

saveastable可能是一個選項,但從我讀到的內容來看,它將收集S3存儲中的數據,而不是將其存儲在羣集的內存中。

希望對此有所幫助! 謝謝!

+1

人們似乎在這個問題上這麼多的問題。他們並不都很清楚。你願意重溫你的問題嗎? – eliasah

回答

1

讓我們使用freeny數據集準備輸入data.frame

ldf <- freeny 

# Extract year and quater 
ldf$yr <- as.integer(rownames(ldf)) 
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr)) 

# Clean column names 
colnames(ldf) <- gsub("\\.", "_", colnames(ldf)) 

# Drop a couple of things so output fits nicely in the code box 
row.names(ldf) <- NULL 
ldf$market_potential <- NULL 

head(ldf) 


##   y lag_quarterly_revenue price_index income_level yr qr 
## 1 8.79236    8.79636  4.70997  5.82110 1962  1 
## 2 8.79137    8.79236  4.70217  5.82558 1962  2 
## 3 8.81486    8.79137  4.68944  5.83112 1962  3 
## 4 8.81301    8.81486  4.68558  5.84046 1963  0 
## 5 8.90751    8.81301  4.64019  5.85036 1963  1 
## 6 8.93673    8.90751  4.62553  5.86464 1963  2 

我發現另一項建議是使用Hivecontext而非SQLcontext作爲SQLcontext可能不允許所有功能。

這是正確的,最先進的功能僅被HiveContext支持,而默認的是SQLContext。首先,你必須確保你的Spark版本已經用Hive支持構建。關於Spark downloads page可用的二進制文件是真的,但是如果您從源代碼構建,則一定要使用-Phive標誌。

hiveContext <- sparkRHive.init(sc) 
sdf <- createDataFrame(hiveContext, ldf) 
printSchema(sdf) 

## root 
## |-- y: double (nullable = true) 
## |-- lag_quarterly_revenue: double (nullable = true) 
## |-- price_index: double (nullable = true) 
## |-- income_level: double (nullable = true) 
## |-- yr: integer (nullable = true) 
## |-- qr: integer (nullable = true) 

初始化sqlContext和使用Registertemptable

這就是正確的,以及寄存器中的數據幀作爲一個臨時表。爲了能夠使用sql命令,您已註冊一個表。

registerTempTable(sdf, "sdf") 

請記住,DataFrame綁定到已用於創建它的上下文。

head(tables(hiveContext)) 

## tableName isTemporary 
## 1  sdf  TRUE 

head(tables(sqlContext)) 

## [1] tableName isTemporary 
## <0 rows> (or 0-length row.names) 

最後例如查詢:

query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag, 
      LAG(y) OVER (ORDER BY yr, qr) AS new_lag 
      FROM sdf" 

sql(hiveContext, query) 

##  yr qr  y old_lag new_lag 
## 1 1962 1 8.79236 8.79636  NA 
## 2 1962 2 8.79137 8.79236 8.79236 
## 3 1962 3 8.81486 8.79137 8.79137 
## 4 1963 0 8.81301 8.81486 8.81486 
## 5 1963 1 8.90751 8.81301 8.81301 
## 6 1963 2 8.93673 8.90751 8.90751 
+0

thanks @ zero323:問題在於將數據框綁定到SQL上下文,我們使用SQLcontext創建了數據框,但嘗試使用Hivecontext滯後,顯然無法引用創建的數據框!感謝您對此的及時迴應! –

相關問題