2017-03-03 214 views
0

我想在小批量(每個csv 6000行)中將大量數據加載到PostgreSQL服務器(總共4000萬行)中的一個表中。我認爲HikariCP將會是這個目標的理想選擇。Postgresql與HikariCP性能問題

這是我通過使用 Java 8(1.8.0_65),Postgres JDBC驅動程序9.4.1211和HikariCP 2.4.3從我的數據插入中獲得的吞吐量。

6000行4分42秒。

我在做什麼錯,我該如何提高插入速度?

很少有關於我的設置更多的話:

  • 計劃在身後CORP網絡我的筆記本電腦上運行。
  • Postgres服務器9.4是具有db.m4.large和50 GB SSD的Amazon RDS。
  • 尚未在表格上創建明確的索引或主鍵。
  • 計劃異步插入每一行與大線程池來容納請求如下:

    private static ExecutorService executorService = new ThreadPoolExecutor(5, 1000, 30L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000)); 
    

數據源配置爲:

 private DataSource getDataSource() { 
       if (datasource == null) { 
        LOG.info("Establishing dataSource"); 
        HikariConfig config = new HikariConfig(); 
        config.setJdbcUrl(url); 
        config.setUsername(userName); 
        config.setPassword(password); 
        config.setMaximumPoolSize(600);// M4.large 648 connections tops 
        config.setAutoCommit(true); //I tried autoCommit=false and manually committed every 1000 rows but it only increased 2 minute and half for 6000 rows 
        config.addDataSourceProperty("dataSourceClassName","org.postgresql.ds.PGSimpleDataSource"); 
        config.addDataSourceProperty("dataSource.logWriter", new PrintWriter(System.out)); 
        config.addDataSourceProperty("cachePrepStmts", "true"); 
        config.addDataSourceProperty("prepStmtCacheSize", "1000"); 
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); 
        config.setConnectionTimeout(1000); 

        datasource = new HikariDataSource(config); 
       } 
       return datasource; 
      } 

這在我讀源數據:

private void readMetadata(String inputMetadata, String source) { 
      BufferedReader br = null; 
      FileReader fr = null; 
      try { 
       br = new BufferedReader(new FileReader(inputMetadata)); 
       String sCurrentLine = br.readLine();// skip header; 
       if (!sCurrentLine.startsWith("xxx") && !sCurrentLine.startsWith("yyy")) { 
        callAsyncInsert(sCurrentLine, source); 
       } 
       while ((sCurrentLine = br.readLine()) != null) { 
        callAsyncInsert(sCurrentLine, source); 
       } 
      } catch (IOException e) { 
       LOG.error(ExceptionUtils.getStackTrace(e)); 
      } finally { 
       try { 
        if (br != null) 
         br.close(); 

        if (fr != null) 
         fr.close(); 

       } catch (IOException ex) { 
        LOG.error(ExceptionUtils.getStackTrace(ex)); 
       } 
      } 
    } 

我插入數據異步(或試圖使用JDBC!):

  private void callAsyncInsert(final String line, String source) { 
        Future<?> future = executorService.submit(new Runnable() { 
         public void run() { 
          try { 
           dataLoader.insertRow(line, source); 
          } catch (SQLException e) { 
           LOG.error(ExceptionUtils.getStackTrace(e)); 
           try { 
            errorBufferedWriter.write(line); 
            errorBufferedWriter.newLine(); 
            errorBufferedWriter.flush(); 
           } catch (IOException e1) { 
            LOG.error(ExceptionUtils.getStackTrace(e1)); 
           } 
          } 
         } 
        }); 
        try { 
         if (future.get() != null) { 
          LOG.info("$$$$$$$$" + future.get().getClass().getName()); 
         } 
        } catch (InterruptedException e) { 
         LOG.error(ExceptionUtils.getStackTrace(e)); 
        } catch (ExecutionException e) { 
         LOG.error(ExceptionUtils.getStackTrace(e)); 
        } 
       } 

我DataLoader.insertRow低於:

  public void insertRow(String row, String source) throws SQLException { 
        String[] splits = getRowStrings(row); 
        Connection conn = null; 
        PreparedStatement preparedStatement = null; 
        try { 
         if (splits.length == 15) { 
          String ... = splits[0]; 
          //blah blah blah 

          String insertTableSQL = "insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "; 
          conn = getConnection(); 
          preparedStatement = conn.prepareStatement(insertTableSQL); 
          preparedStatement.setString(1, column1); 
          //blah blah blah 
          preparedStatement.executeUpdate(); 
          counter.incrementAndGet(); 
          //if (counter.get() % 1000 == 0) { 
           //conn.commit(); 
          //} 
         } else { 
          LOG.error("Invalid row:" + row); 
         } 
        } finally { 
         /*if (conn != null) { 
          conn.close(); //Do preparedStatement.close(); rather connection.close 
         }*/ 
         if (preparedStatement != null) { 
          preparedStatement.close(); 
         } 
        } 
       } 

當pgAdmin4監測,我注意到幾件事情:

  • 數最高每秒交易次數接近50.
  • 活動數據庫會話只有一個,會話總數爲15個。
  • 太多塊I/O(打500左右,不知道這應該是一個問題)

screenshot from pgAdmin

+0

減少連接池的大小和使用的線程的數量:更多連接(和更多線程)不一定會帶來更好的性能,甚至有一點(這可能低於您當前的設置),更多連接(和線程)實際上會導致性能和吞吐量的下降。此外,你應該**關閉你的方法中的連接,並將它返回到連接池以供重用。 –

+0

另外,你真的檢查過,是否與異步插入瓶頸,也許問題是你沒有顯示的代碼(它調用'callAsyncInsert')。 –

+0

感謝您的迴應: – bkrish

回答

2

你絕對要使用批量插入,用語句正在準備循環,並自動關閉。在僞代碼中:

PreparedStatement stmt = conn.prepareStatement("insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") 
while (<data>) { 
    stmt.setString(1, column1); 
    //blah blah blah 
    stmt.addBatch(); 
} 
stmt.executeBatch(); 
conn.commit(); 

即使單個連接上的單個線程也應該能夠插入> 5000行/秒。

更新:如果要多線程它,連接數應該是數據庫CPU核心數x1.5或2。處理線程的數量應該與此相匹配,並且每個處理線程應使用上述模式處理一個CSV文件。但是,您可能會發現許多併發插入到同一個表中的操作會在數據庫中創建太多的鎖定爭用,在這種情況下,您需要退回處理線程的數量,直到找到最佳併發性。

正確大小的池和併發應該很容易達到> 20K行/秒。

此外,請升級到HikariCP v2.6.0。

+0

多線程導入的線程數量不僅取決於服務器上的CPU數量,還取決於該服務器上的硬盤數量。 –

+0

@a_horse_with_no_name儘管如此,但使用Amazon RDS時無法知道該數字。 – brettw

+0

好的。我已經修改了每個建議的程序。升級到2.6.0。增加了批量插入並僅使用連接來加載數據。現在我看到兩種不同類型的數據集有很大的不同。數據集#1是一個csv文件中的500K行(準確地說是499951) - 00:02:08.670分鐘。數據集#2是83個CSV文件中的498K,每個6K行佔用00:02:09.674分鐘。所以我能夠獲得3840ish/sec的吞吐量。如果我沒有宏觀日誌記錄,錯誤處理等重型框架,我可能會得到更多,但我對此感到滿意。非常感謝Woolridge先生爲這個框架和馬克提供幫助。 – bkrish