2015-10-08 15 views
0

我在Postgresql中有多個表。假設一個表有A列,B列,C列,refresh_date,會計周。 B表具有D列,E列,B列,財政周,refresh_date。我想查找表A的本週的記錄總數以及本週本週的表B的E列的總計數。我使用Talend從表A和表B中加載數據,這些表在Postgresql中。此外,如果列E在當前一週的值爲零,那麼它應該向我發送一封郵件。 我想爲此創建一個通用代碼,因爲這是針對表A和表B的,將會使用多個類似於此的表。如何在TalendPostgresql從Postgresql和Talend中的多個表中計算當前的星期記錄

回答

1

幾個月前,我已經創建了類似於您的需要的東西,它基本上執行一個隨機查詢並解析結果集並將其非規格化存儲在數據庫表中。請注意,我使用的企業Talend具有稱爲Dynamic Sc​​hema的整潔功能: https://help.talend.com/pages/viewpage.action?pageId=190513179

那麼我們應該從哪裏開始? 我典型的查詢看起來是這樣的:

select pk1 as rcr_grby_pk1, pk2 as rcr_grpby_pk2, 
     count(*) as cnt, sum(amount) as sum_amount 
from mySchema.myTable 
group by pk1, pk2 

顯然,選擇查詢可以是任何東西,可以包含任意數量的列。我們執行它,並存儲在一個表中的結果會是這樣的:

 
-------------------------------------------------------- 
| schema | table | pk  |measure_name| value_n | 
-------------------------------------------------------- 
| mySchema | myTable | foo2015 | cnt  | 1234  | 
-------------------------------------------------------- 
| mySchema | myTable | foo2015 | sum_amount | 987.65 | 
-------------------------------------------------------- 
| mySchema | myTable | bar2014 | cnt  | 4321  | 
-------------------------------------------------------- 
| mySchema | myTable | bar2014 | sum_amount | 567.89 | 
-------------------------------------------------------- 

我們區分3種基本類型:文本,數字,日期。

我想你可以有一個想法,如何編寫和存儲這些SQL查詢,以及可以傳遞給目標並存儲在目標表中的ID,以便您可以查看生成該結果的內容。

所以加工件。 tFlowToIterate - > tJavaFlex - > tLogRow

我已經將所有東西都放入了一個joblet中,因爲我們正在使用它來協調不同數據庫之間的數據。 (如Oracle和Postgres) Joblet內容:

joblet content

tJavaFlex具有輸出模式是這樣的:

tJavaFlex has an output schema like this:

tJavaFlex內容是這樣的:

開始:

Dynamic record = ((Dynamic)globalMap.get("input.line")); 

String group_by_columns = ""; 

for(int i = 0 ; i < record.getColumnCount() ; i++) { 
    DynamicMetadata meta = record.getColumnMetadata(i); 
    if(meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_")){ 
     group_by_columns += "" + record.getColumnValue(i); 
    } 
} 
for(int i = 0 ; i < record.getColumnCount() ; i++) { 
    DynamicMetadata meta = record.getColumnMetadata(i); 
    if(false == meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_")){ 

主:

out.grp_id = context.grp_id; 
out.job_id = context.job_id; 
out.table_test_id = context.table_test_id; 

out.group_by_columns = group_by_columns; 
out.measure_column_name = meta.getDbName().toUpperCase(); 

out.result_n = /* Float */ null; 
out.result_v = /* String */ null; 
out.result_d = /* Date */ null; 

if((record.getColumnValue(i)!=null) && (meta.getType().equals("id_String"))){ 
    out.result_v = String.valueOf(record.getColumnValue(i)); 
} else if((record.getColumnValue(i)!=null) && meta.getType().equals("id_Date")) { 
    System.out.println(String.valueOf(record.getColumnValue(i))); 
    out.result_d = (Date)record.getColumnValue(i); 
} else if((record.getColumnValue(i)!=null) && (meta.getType().equals("id_Integer") 
     || meta.getType().equals("id_Double") 
     || meta.getType().equals("id_Float") 
     || meta.getType().equals("id_Long"))) { 
    out.result_n = Float.valueOf(String.valueOf(record.getColumnValue(i))); 
} else if((record.getColumnValue(i)!=null) && (meta.getType().equals("id_BigDecimal"))) { 
    out.result_n = new BigDecimal(String.valueOf(record.getColumnValue(i))).floatValue(); 
} else { 
    //Should not happen 
    System.out.println("\n Unhandled type: " + meta.getType());  
} 

末:

} // if 
} //for 

PS:我知道浮球是一個不錯的選擇存儲號碼,但沒有時間返工它,它仍然給出可接受的結果。