2016-02-29 114 views
1

如何使用Groovy實現並行/並行數據庫查詢?使用Groovy的並行/並行數據庫查詢?

我想從數據庫中統計所有表中的行數(select count(*) from $TABLE),並將計數寫入單獨的文件。

一些表格有數百萬行,需要幾分鐘才能計數,許多表沒有行。我希望教Groovy分離每個計數請求,等待結果,抓取列表中的下一個表格,等等。我將此功能添加到現有的Groovy腳本中,該腳本很實用並且可行。

這是我到目前爲止有:

retrieve_table_count_list = { objParams -> 

    aryTables = objParams.table_list 

    objDB2DBRS = [:] 
    objDB2DBRS["database_jdbc_url"] = get_config_setting(setting: "DB2DatabaseURL").toString() 

    strReturnSQL = "" 

    GParsPool.withPool(10) { pool -> 
     currentPool = pool 
     aryTables.eachWithIndexParallel { objTable, intTable -> 

      intNumTables = aryTables.size() 
      strTableName = objTable.table_name 

      strSQL="select count(*) as \"row_count\" from ${strTableName}" 

      aryRowCount = { db_commands(query: strSQL, params: objDB2DBRS) } 
      objFastRowCount = aryRowCount.async() 
      objResultRowCount = objFastRowCount() 

      intRowCount = aryRowCount[0]["row_count"] 

      strReturnSQL += "update tmp_TableCount set row_count=${intRowCount} where table_name='${strTableName}';\n" 
     } 
    } 
    return strReturnSQL 
} 

`

上面的代碼配對下來的要領。原始代碼包含用戶名和密碼(這是唯一的區別)返回此錯誤:

Retrieving row counts from DB2 
TABLE_NAME: select count(*) as "row_count" from TABLE_NAME: Caught: groovy.lang.MissingMethodException: No signature of method: 1500.retrieve-accurate-row-counts-db2-concurrent.async() is applicable for argument types:() values: [] 
Possible solutions: any(), any(groovy.lang.Closure), asType(java.lang.Class), run(), run(), find() 
groovy.lang.MissingMethodException: No signature of method: 1500.retrieve-accurate-row-counts-db2-concurrent.async() is applicable for argument types:() values: [] 
Possible solutions: any(), any(groovy.lang.Closure), asType(java.lang.Class), run(), run(), find() 
     at 1500.retrieve-accurate-row-counts-db2-concurrent$_run_closure2_closure3_closure4.doCall(1500.retrieve-accurate-row-counts-db2-concurrent.groovy:79) 
     at groovyx.gpars.pa.GParsPoolUtilHelper$_eachWithIndex_closure9.doCall(GParsPoolUtilHelper.groovy:182) 
     at com.sun.proxy.$Proxy6.op(Unknown Source) 
     at extra166y.AbstractParallelAnyArray$OOMPap.leafTransfer(AbstractParallelAnyArray.java:2249) 
     at extra166y.PAS$FJOMap.atLeaf(PAS.java:228) 
     at extra166y.PAS$FJBase.compute(PAS.java:78) 
     at jsr166y.RecursiveAction.exec(RecursiveAction.java:148) 
     at jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:305) 
     at jsr166y.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:575) 
     at jsr166y.ForkJoinPool.scan(ForkJoinPool.java:755) 
     at jsr166y.ForkJoinPool.work(ForkJoinPool.java:617) 
     at jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:369) 
+0

到目前爲止,您所遇到的問題是什麼? –

+0

我已經嘗試了不同的方法,得到了不同的錯誤。我主要得到錯誤,如「數據庫連接已關閉」。我會盡快發佈更具體的錯誤消息。 –

+0

您發佈的代碼不完整。在哪裏定義'db_commands'? –

回答

0

有幾個問題。 Jeremie B的回答很有幫助,但還有其他問題。我不確定誰給予答案。任何投入都歡迎!

// PROBLEM #1: 
// db_commands was calling another closure to decrypt a password, 
// but the decryption library was not thread safe 
// solved by wrapping decryption in a "waiting loop:" 
DECRYPT_AVAILABLE = true 
// decrypt string 
decrypt = { def objParams -> 
    // if we're already decrypting a string, wait a short time, then test again 
    while (DECRYPT_AVAILABLE == false) { 
     Thread.sleep(100); 
    } 
    DECRYPT_AVAILABLE = false 
    // ... decrypt text ... 
    DECRYPT_AVAILABLE = true 
    return decryptedText; 
} 

def count_table_rows = { def objParams -> 
    def strTableName = objParams.table_name 
    def objMSSQLDBRS = objParams.params 
    def intTotal = objParams.total 

    def String strSQL 
    def boolean bolCountError 
    def int intRowCount 
    def String strRowCount 
    def String strToLog 
    def objRowCount 
    def aryRowCount 
    def strReturnSQL 

    strSQL = "select count(*) as \"row_count\" from ${strTableName}" 
    objRowCount = db_commands(query: strSQL, params: objMSSQLDBRS) 

    if (objRowCount.containsKey("error") && objRowCount["error"] == true) { 
     w(string: "ERROR on SQL: " + strSQL, style: "error"); 
     w(string: objRowCount["error_message"].toString(), style: "error"); 
     System.exit(0) 
    } 

    aryRowCount = objRowCount["data"] 
    intRowCount = aryRowCount[0]["row_count"] 

    strReturnSQL = "update tmp_TableCounts set row_count=${intRowCount} where table_name='${strTableName}';\n" 

    return strReturnSQL 
} 

def count_all_table_rows = { 
    def objPGDBRS = [:] 
    def objMSSQLDBRS = [:] 
    def strReturn 
    objPGDBRS["database_jdbc_url"] = get_config_setting(setting: "PGDatabaseURL").toString() 
    def strPGSchema = get_config_setting(setting: "PGDatabaseSchema").toString() 

    def strSQL="select schemaname, tablename from pg_tables where schemaname='${strPGSchema}' order by tablename" 

    // PROBLEM #2: 
    // db_commands can't have any variables defined within the closure that are accessible outside the closure 
    // (iow, all variables must be locally scoped) 

    def objResults = db_commands(query: strSQL, params: objPGDBRS); 

    if (objResults["error"] == true) { 
     w(string: "ERROR on SQL: " + strSQL, style: "error"); 
     w(string: objResults["error_message"].toString(), style: "error"); 
     System.exit(0) 
    } 

    def aryTables = objResults["data"].table_name 
    def strMSSQLSchema 

    objMSSQLDBRS["database_jdbc_url"] = get_config_setting(setting: "MSSQLDatabaseURL").toString() 

    GParsPool.withPool() { 
     aryTables.eachParallel { def strTableName -> 
      strReturn = count_table_rows(table_name: strTableName, params: objMSSQLDBRS, total: aryTables.size()) 
      strCommands += "$strReturn" 
     } 
    } 

}