2014-10-03 91 views
0

我有一個由Ruby腳本解析的大型數據集。該腳本創建一個CSV,然後將其上傳到Redshift數據庫。日誌中的大部分行成功上傳,但由於「找到多餘的列」,導致很多上傳失敗。我已經隔離了發生這種情況的情況。將數據解析爲CSV格式時出錯:「將csv加載到數據庫時發現額外列出錯」

日誌數據是這樣的:

2014-09-22 13:02:16-0400,238 {"Items":[{"PubEndDate":"2002/04/09","ItmId":"1280429264","SourceType":"Government & Official Publications","ReasonCode":"","MyResearchUser":"","ProjectCode":"","PublicationCode":"","PubStartDate":"2002/04/09","ItmFrmt":"KWIC","Subrole":"KWIC","PaymentType":"PrePaid","UsageInfo":"P-1008361-158946-STAFF-null-2195091","Role":"KWIC","RetailPrice":1.19,"EffectivePrice":0,"ParentItemId":"396489"},{"PubEndDate":"2012/04/05","ItmId":"1139461559","SourceType":"Government & Official Publications","ReasonCode":"","MyResearchUser":"","ProjectCode":"","PublicationCode":"","PubStartDate":"2012/04/05","ItmFrmt":"KWIC","Subrole":"KWIC","PaymentType":"PrePaid","UsageInfo":"P-1008365-158946-STAFF-null-2195099","Role":"KWIC","RetailPrice":0.75,"EffectivePrice":0,"ParentItemId":"396490"}]} 

然後我通過一個Ruby腳本,看起來像這樣創建一個CSV(原諒大的代碼塊,這是一個漫長的腳本):

require 'json' 

# add methods to unnest ruby hashes for converting nested json into an array with reasonable values 
class Hash 
    def unnest 
    new_hash = {} 
    each do |key,val| 
     if val.is_a?(Hash) 
     new_hash.merge!(val.prefix_keys("#{key}-")) 
     else 
     new_hash[key] = val 
     end 
    end 
    new_hash 
    end 

    def prefix_keys(prefix) 
    Hash[map{|key,val| [prefix + key, val]}].unnest 
    end 
end 

def parse(usage) 

    usage = usage.gsub(/|/,'').gsub(/\n/, '') 
    #Array of all possible keys, make sure all fields in db are filled regardless of how many params are passed into the usage log 
    keys = ["UserAgent","IP","AppId","SessId","JSessionId","LangCd","UsageType","BreadCrumb","AuthType","UsageGroupId","SearchType","ResponseTime","EventType","LandedFirstPage","ReferringUrl","PubEndDate","ItmId","PubStartDate","ItmFrmt","OpenUrlRefId","OpenAccess","LinkSource","SourceType","Subrole","PremId","PaymentType","ObjectType","OrigSite","UsageInfo","Role","DeliveryMethod","ParentItemId","SearchAllProductsFlag","MarketSegment","SearchCount","SearchEngine","QryString","SubjectKey","SearchId","SearchHits","UserInfo-IP","UserInfo-AppId","UserInfo-SessId","UserInfo-UsageGroupId","SearchProductInfo","TurnAwayFlag","LinkOutTarget","LinkOutType","TranslationTime","TextSize","TextType","SourceLang","DestinationLang","ReasonCode","RetailPrice","EffectivePrice","MyResearchUser","ProjectCode","DocID","ListingType","MasterID","TerminatedSessionID","PublicationId","PublicationTitle","ItemTitle","AccessAgreementStatus"] 

    items_keys = ["ReferringUrl","PubEndDate","ItmId","SourceType","PubStartDate","PublicationCode","ItmFrmt","PaymentType","ObjectType","OrigSite","UsageInfo","OpenUrlRefId","TurnAwayFlag","OpenAccess","ParentItemId","SearchId","SearchProductInfo","EventName","HistoryId","AlertId","ReasonCode","Origin","MyResearchUser","ProjectCode","Subrole","NumberOfCopies","Role","RetailPrice","EffectivePrice","Multiplier","PublicationId","PublicationTitle","ItemTitle",] 
    # extract date and time from json, then parse json to ruby hash 

    date = usage.scan(/\d{4}-\d\d-\d\d/).first 
    time = usage.scan(/\d\d:\d\d:\d\d/).first 
    json = usage.scan(/\{.*\}/).first 
    parsed = JSON.parse(json).unnest 

    # return array of values, substituting 'Not Listed' for all missing attributes 
    result = [] 
    items_result = [] 
    result = (0...keys.length).map{ |i| parsed[keys[i]] || 'NA'} 
    result.unshift date 
    result.unshift time 
    result.push "save_space"#usage 
    items = JSON.parse(json) 

    temp_result = result 

    CSV.open("testing.csv", "a+", {:col_sep => "|"}) do |csv| 
     begin 
     items["Items"].each do |item| 
      item_result = (0...items_keys.length).map{ |i| item[items_keys[i]] || "NA" } 
      temp_result = (temp_result << item_result).flatten! 
    csv << temp_result 
    temp_result = result.flatten 
     item_result = [] 
     end 
     rescue 
      item_result = (0...items_keys.length).map{ |i| "NA" } 
    temp_result = (temp_result << item_result).flatten! 
    csv << temp_result 
    temp_result = result.flatten 
     item_result = [] 
     end 
    end 
    nil 
end 

File.readlines("file.log").each do |line| 
    parse(line) 
end 
`ruby upload_csv_to_redshift.rb usage_logs_testing` 

此腳本創建一個CSV,看起來像這樣:

13:02:16|2014-09-22|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|save_space|NA|2002/04/09|1280429264|Government & Official Publications|2002/04/09|""|KWIC|PrePaid|NA|NA|P-1008361-158946-STAFF-null-2195091|NA|NA|NA|396489|NA|NA|NA|NA|NA|""|NA|""|""|KWIC|NA|KWIC|1.19|0|NA|NA|NA|NA 
13:02:16|2014-09-22|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|save_space|NA|2002/04/09|1280429264|Government & Official Publications|2002/04/09|""|KWIC|PrePaid|NA|NA|P-1008361-158946-STAFF-null-2195091|NA|NA|NA|396489|NA|NA|NA|NA|NA|""|NA|""|""|KWIC|NA|KWIC|1.19|0|NA|NA|NA|NA|NA|2012/04/05|1139461559|Government & Official Publications|2012/04/05|""|KWIC|PrePaid|NA|NA|P-1008365-158946-STAFF-null-2195099|NA|NA|NA|396490|NA|NA|NA|NA|NA|""|NA|""|""|KWIC|NA|KWIC|0.75|0|NA|NA|NA|NA 

這是上傳到結構化的數據庫紅移像這樣:

CREATE TABLE usage_logs_test 
(
log_id bigint IDENTITY (0,1), 
log_time varchar(200), 
log_date varchar(200), 
UserAgent varchar(max), 
IP varchar(max), 
AppId varchar(max), 
SessId varchar(max), 
JSessionId varchar(max), 
LangCd varchar(max), 
UsageType varchar(max), 
BreadCrumb varchar(max), 
AuthType varchar(max), 
UsageGroupId varchar(max), 
SearchType varchar(max), 
ResponseTime varchar(max), 
EventType varchar(max), 
LandedFirstPage varchar(max), 
ReferringUrl varchar(max), 
PubEndDate varchar(max), 
ItmId varchar(max), 
PubStartDate varchar(max), 
ItmFrmt varchar(max), 
OpenUrlRefId varchar(max), 
OpenAccess varchar(max), 
LinkSource varchar(max), 
SourceType varchar(max), 
Subrole varchar(max), 
PremId varchar(max), 
PaymentType varchar(max), 
ObjectType varchar(max), 
OrigSite varchar(max), 
UsageInfo varchar(max), 
Role varchar(max), 
DeliveryMethod varchar(max), 
ParentItemId varchar(max), 
SearchAllProductsFlag varchar(max), 
MarketSegment varchar(max), 
SearchCount varchar(max), 
SearchEngine varchar(max), 
QryString varchar(max), 
SubjectKey varchar(max), 
SearchId varchar(max), 
SearchHits varchar(max), 
UserInfo_IP varchar(max), 
UserInfo_AppId varchar(max), 
UserInfo_SessId varchar(max), 
UserInfo_UsageGroupId varchar(max), 
SearchProductInfo varchar(max), 
TurnAwayFlag varchar(max), 
LinkOutTarget varchar(max), 
LinkOutType varchar(max), 
TranslationTime varchar(max), 
TextSize varchar(max), 
TextType varchar(max), 
SourceLang varchar(max), 
DestinationLang varchar(max), 
ReasonCode varchar(max), 
RetailPrice varchar(max), 
EffectivePrice varchar(max), 
MyResearchUser varchar(max), 
ProjectCode varchar(max), 
DocID varchar(max), 
ListingType varchar(max), 
MasterID varchar(max), 
TerminatedSessionID varchar(max), 
PublicationId varchar(max), 
PublicationTitle varchar(max), 
ItemTitle varchar(max), 
AccessAgreementStatus varchar(max), 
full_log varchar(max) 


ReferringUrl varchar(max), 
PubEndDate varchar(max), 
ItmId varchar(max), 
SourceType varchar(max), 
PubStartDate varchar(max), 
PublicationCode varchar(max), 
ItmFrmt varchar(max), 
PaymentType varchar(max), 
ObjectType varchar(max), 
OrigSite varchar(max), 
UsageInfo varchar(max), 
OpenUrlRefId varchar(max), 
TurnAwayFlag varchar(max), 
OpenAccess varchar(max), 
ParentItemId varchar(max), 
SearchId varchar(max), 
SearchProductInfo varchar(max), 
EventName varchar(max), 
HistoryId varchar(max), 
AlertId varchar(max), 
ReasonCode varchar(max), 
Origin varchar(max), 
MyResearchUser varchar(max), 
ProjectCode varchar(max), 
Subrole varchar(max), 
NumberOfCopies varchar(max), 
Role varchar(max), 
RetailPrice varchar(max), 
EffectivePrice varchar(max), 
Multiplier varchar(max), 
PublicationId varchar(max), 
PublicationTitle varchar(max), 
ItemTitle varchar(max), 
OrigId varchar(200) 
); 

這個問題似乎是一個很大的數據被複制,就好像我叫temp_result陣列不是在items["Items"].each塊結束結算回result值。

我意識到這是一個非常大的問題,但我已經通過巨大的努力,以簡化和概括它儘可能同時保持工作的代碼示例了。

回答

1

你需要記住,數組是通過引用在Ruby中傳遞的。試試這個:

a = ["YO"] 
b = a 
b << "HEY" 
puts a.inspect 
# => ["YO", "HEY"] 

現在考慮您的腳本這一行:

temp_result = result 

,後來

(temp_result << item_result).flatten! 

temp_result不清除回result原來的價值,因爲它們都是指到內存中的同一個數組。結果的原始值已消失,通過使用追加到位方法<<覆蓋。

最快的解決方法是:

temp_result = result.clone 

同樣,除非你明確知道你爲什麼這樣做它的確切原因,你不希望使用爆炸方法,例如在分配flatten!,你想flatten

+1

一種更好的方式來顯示,包含陣列和存儲器都指向其分配給另一個變量到同一陣列的變量,是使用'object_id'。 'a = []; b = a'。此時'a.object_id#=> 70098065043200'和'b.object_id#=> 70098065043200'。 (或者其他一些ID,但他們都是一樣的。) – 2014-10-03 17:22:25

+2

我不認爲這樣做更好,因爲它依賴於隱性知識。在不知道'#object_id'代表什麼的情況下,完全可能進入ruby的職業生涯。如果您的目標是最大限度地增加可能會發現它的用戶的讀者數量,那麼演示將勝過描述。 – SLD 2014-10-03 17:35:08

+1

這個社區是我喜歡編程的重要組成部分。非常感謝你們。 – johncorser 2014-10-03 20:02:48

相關問題