2012-07-10 110 views
1

我已經使用多個同步線程來處理多個事務,並通過使用hibernate框架和使用鎖定概念從數據庫中處理這些事務。 問題是,每個線程進程第一次正確,但下次拋出SQL 01002和SQL 72000時出現異常。請看代碼 -使用休眠和線程鎖定

long _StartTimeForMe = System.currentTimeMillis(); 

    boolean _contextPushed; 
    txContext = 
     GlobalFunctions.generateId(
      true, 
      DataBrokerConstants.CONTEXT_PREFIX, 
      null, 
      null); 
    txContext = this.getName() + txContext; 
    logger_U.pushContext(txContext); 
    _contextPushed = true; 

    try 
    { 
     coreSession = openTxCoreSession(); 
    } 
    catch (Exception e1) 
    { 
     logger_U.info("exception while creating Tx_Core session"); 
     e1.printStackTrace(); 
     return; 
    } 

    try 
    { 
     stagSession = openStagSession(); 
    } 
    catch (RuntimeException e2) 
    { 
     logger_U.info("exception while creating stag session"); 
     e2.printStackTrace(); 
     return; 
    } 

    String _txContextBackup = txContext; 

    int i = 1; 
    long _timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe; 

    try 
    { 
     while (doIhaveMoreTime(_timeElapsedByMe)) 
     { 
      txContext = _txContextBackup + "-" + (i); 
      logger_U.removeContext(); 
      logger_U.pushContext(txContext); 

      coreTransaction = null; 
      stagTransaction = null; 

      InnerClassUpdate _rec = null; 
      UpdateRecord _r = null; 
      if (null != coreSession) 
      { 
       coreTransaction = coreSession.beginTransaction(); 
       _rec = getURForUpdates(coreSession); 
      } 
      else 
      { 
       logger_U.error("coreSession is not created."); 
       break; 
      } 

      if (null != _rec) 
      { 
       _r = _rec.record; 
       logger_U.info(
        "record found. Going to process Record: ID = " 
         + _r.getId() 
         + " TX_IDENTIFIER = " 
         + _r.getTxIdentifier() 
         + "UPDATE_TYPE = " 
         + _r.getUpdateType()); 

       //If a record of same transaction is processing or has been processed before in same run, it will not run again in this run 
       if (isTxProcessing(_r.getTxIdentifier())) 
       { 
        if (coreTransaction != null 
         && !coreTransaction.wasCommitted() 
         && !coreTransaction.wasRolledBack()) 
         coreTransaction.rollback(); 

        if (stagTransaction != null 
         && !stagTransaction.wasCommitted() 
         && !stagTransaction.wasRolledBack()) 
         stagTransaction.rollback(); 

        logger_U.debug(
         "A record of Tx_Identifier : " 
          + _r.getTxIdentifier() 
          + " is either processing or has been processed in this run."); 
       } 
       else 
       { 
        try 
        { 
         stagTransaction = stagSession.beginTransaction(); 
         if (processUpdates(_rec)) 
         { 
          try 
          { 
           if (stagTransaction != null 
            && !stagTransaction.wasCommitted() 
            && !stagTransaction.wasRolledBack()) 
           { 
            stagTransaction.commit(); 

            logger_U.debug("stag commit success"); 
           } 
           if (coreTransaction != null 
            && !coreTransaction.wasCommitted() 
            && !coreTransaction.wasRolledBack()) 
           { 
            coreTransaction.commit(); 

            logger_U.debug("core commit success"); 
           } 

          } 
          catch (HibernateException e) 
          { 
           logger_U.debug("error while commit"); 
           e.printStackTrace(); 
          } 

         } 
         else 
         { 
          if (stagTransaction != null 
           && !stagTransaction.wasCommitted() 
           && !stagTransaction.wasRolledBack()) 
           stagTransaction.rollback(); 

          logger_U.debug("stagTransaction rollback..."); 
          if (coreTransaction != null 
           && !coreTransaction.wasCommitted() 
           && !coreTransaction.wasRolledBack()) 
           coreTransaction.rollback(); 

          logger_U.debug("coreTransaction rolback..."); 
         } 
        } 
        catch (Exception e1) 
        { 
         if (stagTransaction != null 
          && !stagTransaction.wasCommitted() 
          && !stagTransaction.wasRolledBack()) 
          stagTransaction.rollback(); 

         if (coreTransaction != null 
          && !coreTransaction.wasCommitted() 
          && !coreTransaction.wasRolledBack()) 
          coreTransaction.rollback(); 

         logger_U.error(
          "Exception while processing UpdateRecord."); 
         e1.printStackTrace(); 
        } 
       } 
      } 
      else 
      { 
       if (stagTransaction != null 
        && !stagTransaction.wasCommitted() 
        && !stagTransaction.wasRolledBack()) 
        stagTransaction.rollback(); 

       if (coreTransaction != null 
        && !coreTransaction.wasCommitted() 
        && !coreTransaction.wasRolledBack()) 
        coreTransaction.rollback(); 

       logger_U.info(
        "No record found. wait for " 
         + convertMilliesToSec(
          DataBrokerConstants.SMALL_SLEEP) 
         + "Sec."); 
       Thread.sleep(DataBrokerConstants.SMALL_SLEEP); 
       i++; 
       _timeElapsedByMe = 
        System.currentTimeMillis() - _StartTimeForMe; 

       continue; 
      } 

      //No matter record is processed successfully or not, it will not be processed again in this run 
      if (_rec != null && vectForUpdate.contains(_rec)) 
      { 
       logger_U.debug(
        "UpdateRecord of ID " 
         + _r.getId() 
         + " and TX_IDENTIFIER " 
         + _r.getTxIdentifier() 
         + " is removing from vectForUpdate."); 
       removeObjectFromVect(_rec); 
      } 
      Thread.sleep(1000); 
      _timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe; 
      i++; 
     } //END while loop 
    } 
    catch (Exception e) 
    { 
     try 
     { 
      if (stagTransaction != null 
       && !stagTransaction.wasCommitted() 
       && !stagTransaction.wasRolledBack()) 
       stagTransaction.rollback(); 

      if (coreTransaction != null 
       && !coreTransaction.wasCommitted() 
       && !coreTransaction.wasRolledBack()) 
       coreTransaction.rollback(); 

     } 
     catch (HibernateException e3) 
     { 
      // TODO Auto-generated catch block 
      e3.printStackTrace(); 
     } 
     logger_U.error("Unknown Exception occured while processing."); 
     e.printStackTrace(); 
    } 
    finally 
    { 
     txContext = _txContextBackup; 
     logger_U.removeContext(); 
     logger_U.pushContext(txContext); 
     closeSession(coreSession); 
     closeSession(stagSession); 
    } 
+0

聽起來像你真的想交易和隔離給我。 – duffymo 2012-07-10 11:45:11

+0

「二手鎖定概念」是什麼意思? Hibernate和您的數據庫引擎在設計時考慮了事務和併發,它們已經按照定義是線程安全的。會話不是線程安全的,但你不應該在線程之間共享同一個會話。 – 2012-07-10 11:50:59

回答

2

當使用Hibernate時,您應該在多線程環境中非常小心。

  • 您應該驗證的第一件事是事務被正確提交,並執行SQL。如果你不能通過閱讀你的代碼來驗證它,你可以增加Hibernate的日誌級別來驗證它。

  • Hibernate Session不是線程安全的,你只能在線程間共享SessionFactory。您還必須驗證每次打開會話時都注意關閉它,因爲這不會自動發生,並且會阻止新線程打開新會話。

+0

感謝您的寶貴答覆。我正在使用多個會話。線程來創建會話和循環內的開始處理。每次創建新事務時在循環內部。請看下面的示例代碼 - – user1514616 2012-07-10 11:56:23

+0

你在哪裏關閉會話? – Edmondo1984 2012-07-10 12:12:46

+0

在finally塊 – user1514616 2012-07-10 12:16:02