2014-11-06 90 views
0

我正在寫一個java應用程序,它將一個數據庫的信息(db2)複製到另一個數據庫(sql server)。操作的順序很簡單:看到使大量的單個行更新更快或更高效

  1. 檢查任何已在一定的時間範圍內更新
  2. 抓住一切從在指定的時間框架
  3. 地圖數據庫中的信息中的第一個數據庫的POJO的POJO
  4. 鴻溝子集成線(預先定義在#屬性文件中)
  5. 線程循環通過每個POJO獨立
  6. 更新第二數據庫

我有一切正常工作,但在一天中的某些時候,需要發生的更新量(可以達到成千上萬)有一個巨大的跳躍。

下面您可以看到我的代碼的通用版本。它遵循應用程序的基本算法。對象是通用的,實際應用程序有5種不同類型的指定對象,每個對象都有自己的更新器線程類。但是下面的通用函數正是它們所有的樣子。並且在updateDatabase()方法中,它們全部被添加到threads並且全部同時運行。

private void updateDatabase() 
{ 
    List<Thread> threads = new ArrayList<>(); 
    addObjectThreads(threads);  
    startThreads(threads); 
    joinAllThreads(threads); 
} 

private void addObjectThreads(List<Thread> threads) 
{ 
    List<Object> objects = getTransformService().getObjects(); 
    logger.info("Found " + objects.size() + " Objects"); 
    createThreads(threads, objects, ObjectUpdaterThread.class); 
} 

private void createThreads(List<Thread> threads, List<?> objects, Class threadClass) 
{ 
    final int BASE_OBJECT_LOAD = 1; 
    int objectLoad = objects.size()/Database.getMaxThreads() > 0 ? objects.size()/Database.getMaxThreads() + BASE_OBJECT_LOAD : BASE_OBJECT_LOAD; 

    for (int i = 0; i < (objects.size()/objectLoad); ++i) 
    { 
     int startIndex = i * objectLoad; 
     int endIndex = (i + 1) * objectLoad; 
     try 
     { 
      List<?> objectSubList = objects.subList(startIndex, endIndex > objects.size() ? objects.size() : endIndex); 
      threads.add(new Thread((Thread) threadClass.getConstructor(List.class).newInstance(objectSubList))); 
     } 
     catch (Exception exception) 
     { 
      logger.error(exception.getMessage()); 
     } 
    } 
} 


public class ObjectUpdaterThread extends BaseUpdaterThread 
{ 
    private List<Object> objects; 
    final private Logger logger = Logger.getLogger(ObjectUpdaterThread.class); 

    public ObjectUpdaterThread(List<Object> objects) 
    { 
     this.objects = objects; 
    } 

    public void run() 
    { 
     for (Object object : objects) 
     { 
      logger.info("Now Updating Object: " + object.getId()); 
      getTransformService().updateObject(object); 
     } 
    } 
} 

所有這些轉到春季服務,看起來像下面的代碼。它的泛型也是同樣的,但每種類型的對象都具有完全相同的邏輯類型。上面的代碼中的getObjects()只是一行傳遞給DAO,因此不需要真正發佈它。

@Service 
@Scope(value = "prototype") 
public class TransformServiceImpl implements TransformService 
{ 
    final private Logger logger = Logger.getLogger(TransformServiceImpl.class); 

    @Autowired 
    private TransformDao transformDao; 

    @Override 
    public void updateObject(Object object) 
    { 
     String sql; 
     if (object.exists()) 
     { 
      sql = Object.Mapper.UPDATE; 
     } 
     else 
     { 
      sql = Object.Mapper.INSERT; 
     } 

     boolean isCompleted = false; 
     while (!isCompleted) 
     { 
      try 
      { 
       transformDao.updateObject(object, sql); 
       isCompleted = true; 
      } 
      catch (Exception exception) 
      { 
       logger.error(exception.getMessage()); 
       threadSleep(); 
       logger.info("Now retrying update for Object: " + object.getId()); 
      } 
     } 
     logger.info("Updated Object: " + object.getId()); 
    } 
} 

最後,這些都去,看起來像這樣的DAO:

@Repository 
@Scope(value = "prototype") 
public class TransformDaoImpl implements TransformDao 
{ 
    //@Resource is like @Autowired but with the added option of being able to specify the name 
    //Good for autowiring two different instances of the same class [NamedParameterJdbcTemplate] 
    //Another alternative = @Autowired @Qualifier(BEAN_NAME) 
    @Resource(name = "db2") 
    private NamedParameterJdbcTemplate db2; 

    @Resource(name = "sqlServer") 
    private NamedParameterJdbcTemplate sqlServer; 

    final private Logger logger = Logger.getLogger(TransformerImpl.class); 

    @Override 
    public void updateObject(Objet object, String sql) 
    { 
     MapSqlParameterSource source = new MapSqlParameterSource(); 
     source.addValue("column1_value", object.getColumn1Value()); 
     //put all source values from the POJO in just like above 

     sqlServer.update(sql, source); 
    } 
} 

我的INSERT語句是這樣的:

"INSERT INTO dbo.OBJECT_TABLE " + 
"(COLUMN1, COLUMN2...) " + 
"VALUES(:column1_value, :column2_value... " 

而且我的更新語句是這樣的:

"UPDATE dbo.OBJECT_TABLE SET " + 
"COLUMN1 = :column1_value, COLUMN2 = :column2_value, " + 
"WHERE PRIMARY_KEY_COLUMN = :primary_key_value" 

它的一個lo t代碼和我知道的東西,但我只是想佈置我所擁有的一切,希望能夠幫助我們更快或更高效地實現這一目標。更新很多行需要幾個小時,如果只花費幾個小時而不是幾個小時,它會很好。謝謝你的幫助。我歡迎所有有關Spring,線程和數據庫的學習經驗。

+0

如果您遇到個別行更新,它總是效率低下。你確定你不能使用基於集合的邏輯嗎? – Mansfield 2014-11-06 16:15:44

+0

最有效的方法是創建表的轉儲(並將它們導入目標)。如果您需要更新已有行的單個列,那麼您幾乎不走運;如果您更新「僅僅是因爲」,請嘗試刪除並重新插入。 – Durandal 2014-11-06 16:35:48

回答

0

如果要將大量SQL發送到服務器,則應考慮使用Statement.addBatchStatement.executeBatch方法對其進行批處理。批處理的大小是有限的(我總是將我的數據限制在64K的SQL中),但它們大大降低了往返數據庫的次數。

當我迭代並創建SQL時,我會跟蹤已經批處理的數量,當SQL跨越64K邊界時,我會啓動一個executeBatch並啓動一個新的。

你可能想用64K號試驗,它可能是一個Oracle限制,我用的時候它。

我不能說Spring,但批處理是JDBC的一部分Statement。我確信這很簡單。

+0

你讓我在正確的道路上肯定感謝。我使用了NamedParameterJdbcTemplate的batchUpdate方法,儘管仍然沒有我想要的那麼快,但它確實快了一點。那謝謝啦! – 2014-11-06 21:01:39

0
  1. 檢查,看看是否有任何已在一定的時間範圍內更新
  2. 抓住一切從在指定的時間框架

中的第一個數據庫是否有一個指數源表中的LAST_UPDATED_DATE列(或任何您正在使用的)?爲什麼不在應用程序中增加負擔,如果它在你的控制範圍之內,爲什麼不在源數據庫中編寫一些在「更新日誌」表中創建條目的觸發器?這樣,你的應用程序需要做的就是消費並執行這些條目。

您是如何管理您的交易?如果您爲每項操作創建新的交易,它將會非常緩慢。

關於線程代碼,你有沒有考慮使用更標準的東西,而不是自己寫?你所擁有的是一個非常典型的producer/consumer,Java對這種類型的事物有很好的支持,ThreadPoolExecutornumerous queue implementations在執行不同任務的線程之間移動數據。

使用現成的東西的好處是:1)經過充分測試2)有許多調整選項和尺寸調整策略,您可以調整以提高性能。

另外,而不是使用5種不同的螺紋類型需要被處理的對象的每個類型,你認爲封裝處理邏輯針對每種類型的成單獨的戰略類?這樣,您可以使用單個工作線程池(這將更容易調整大小和調整)。