2013-01-10 37 views
2

現在基本上我創建了三個類。如何將這些新消息傳遞給另一個類

public void run() { 
int seqId = 0; 
while(true) { 
    List<KamMessage> list = null; 
    try { 
     list = fullPoll(seqId); 
    } catch (Exception e1) { 
     e1.printStackTrace(); 
    } 
    if (!list.isEmpty()) { 
     seqId = list.get(0).getSequence(); 
     incomingMessages.addAll(list); 
     System.out.println("waiting 3 seconds"); 
     System.out.println("new incoming message"); 
    } 
    try { 
     Thread.sleep(3000); 
     System.out.println("new incoming message"); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 
} 
public List<KamMessage> fullPoll(int lastSeq) throws Exception { 
Statement st = dbConnection.createStatement(); 
ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION = 804 and SEQ >" + 
lastSeq + "order by SEQ DESC");   
List<KamMessage> pojoCol = new ArrayList<KamMessage>(); 
    while (rs.next()) { 
    KamMessage filedClass = convertRecordsetToPojo(rs); 
    pojoCol.add(filedClass); 
    } 
for (KamMessage pojoClass : pojoCol) { 
    System.out.print(" " + pojoClass.getSequence()); 
    System.out.print(" " + pojoClass.getTableName()); 
    System.out.print(" " + pojoClass.getAction()); 
    System.out.print(" " + pojoClass.getKeyInfo1()); 
    System.out.print(" " + pojoClass.getKeyInfo2()); 
    System.out.println(" " + pojoClass.getEntryTime()); 
    }    
return pojoCol; 
    } 

下面是類: 1.Poller-確實輪詢和從分貝通行證新的數據到控制器

2.Controller-這個類有一個線程池,其同時調用輪詢並且具有要從處理器請求的新數據

3.處理器 - 該類必須查找新數據,處理它並將其返回給控制器。

所以現在我的問題是如何實現第三階段...

這裏是我的控制器類:

public class RunnableController { 

/** Here This Queue initializes the DB and have the collection of incoming message 
*      
*/ 
    private static Collection<KpiMessage> incomingQueue = new ArrayList<KpiMessage>(); 
    private Connection dbConncetion; 
    public ExecutorService threadExecutor; 
    private void initializeDb() 
    { 
    //catching exception must be adapted - generic type Exception prohibited 
    DBhandler conn = new DBhandler(); 
    try { 
     dbConncetion = conn.initializeDB(); 
    } catch (Exception e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
    } 


private void initialiseThreads() 
{   
    try { 

     threadExecutor = Executors.newFixedThreadPool(10); 
      PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion); 
     threadExecutor.submit(read); 

    }catch (Exception e){ 
    e.printStackTrace(); 
    } 

} 

@SuppressWarnings("unused") 
private void shutDownThreads() 
{   
    try { 
     threadExecutor.shutdown(); 
     //DB handling should be moved to separate DB class 
     dbConncetion.close(); 

    }catch (Exception e){ 
    e.printStackTrace(); 
    } 

} 

/** Here This Queue passes the messages and have the collection of outgoing message 
* 
*/ 

//private Collection<KpiMessage> outgingQueue = new ArrayList<KpiMessage>(); 
//have to implement something here for future 

    public static void main(String[] args) throws InterruptedException { 
    RunnableController controller = new RunnableController(); 

    System.out.println(incomingQueue.size()); 

    controller.initializeDb(); 
    controller.initialiseThreads(); 

    Thread.sleep(3000); 
    System.out.println("Polling"); 

    } 

} 

回答

4

我會建議使用的BlockingQueue這樣做,而不是一個簡單的ArrayList 。只需更改您的incomingQueue變量的類型。然後你就可以有另一個線程(或線程池)做這樣的事情

//pseudocode 
while (true) { 
    // it polls data from the incomingQueue that shares with the producers 
    KpiMessage message = this.incomingQueue.take() 

    //Then process the message and produces an output... you can put that output in a different queue as well for other part of the code to pick it up 
} 

上BlockingQueues一個很好的例子可以在這裏找到http://www.javamex.com/tutorials/blockingqueue_example.shtml

+1

順便說一句,你是一個典型的生產者 - 消費者問題。只需谷歌瞭解如何在java 5中解決它(以前的sdks上沒有隊列,並且使用監視器是相當煩人的),你會得到大量的例子。 – Claudio

+0

謝謝你的意思是說,我應該使用隊列而不是我的Poller類中的列表... !! – Babu

+1

是的,原因很多: 1 - 常用數組列表不是線程安全的,因此除非您同步它,否則您將遇到競爭條件問題 2-消費者可以阻止和被動地等待新消息由製片人放在隊列中 – Claudio