2013-02-18 179 views
18
import java.util.concurrent.CountDownLatch; 

    import quickfix.Initiator; 


    public class UserSession { 
    private final CountDownLatch latch = new CountDownLatch(1); 

public String await() { 
     try { 
      System.out.println("waiting..."); 
      if (latch.await(5, TimeUnit.SECONDS)) 
       System.out.println("released!"); 
      else 
       System.out.println("timed out"); 
      return secret; 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      System.out.println(e.getMessage()); 
      e.printStackTrace(); 
     } 
     return null; 
    } 

    public void countdown(String s) { 
     System.out.println("In countdown: "+s+ ". Latch count: "+latch.getCount()); 
     secret = s; 
     latch.countDown(); 
     System.out.println("Latch count: "+latch.getCount()); 
    } 
    } 


    public class LogonHandler extends AbstractHandler { 

    public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) 
     throws IOException, ServletException 
     { 
      Map<String,String[]> query = request.getParameterMap(); 

      if (query.containsKey("method")) { 
       if (query.get("method")[0].compareTo(method) == 0) { 
        baseRequest.setHandled(true); 
        response.getWriter().println(logon(query)); 
       } 
      } 
      else 
       baseRequest.setHandled(false); 
     } 

    private String logon(Map<String,String[]> query) { 
     if (query.containsKey("username") && query.containsKey("password") &&   query.containsKey("sendercompid")) { 

      app.mapUser(query.get("sendercompid")[0], new UserSession(query.get("username")[0], query.get("password")[0])); 

      SessionID session = new SessionID(new BeginString("FIX.4.4"), new SenderCompID(query.get("sendercompid")[0]), new TargetCompID("PARFX")); 

      try { 
       ThreadedSocketInitiator tsi = new ThreadedSocketInitiator(app, app.getFileStoreFactory(), settings, app.getLogFactory(), app.getMessageFactory()); 
       UserSession userSession = new UserSession(query.get("username")[0], query.get("password")[0]); 
       userSession.setInitiator(tsi); 

       tsi.start(); 
       return userSession.await(); 
      } catch (ConfigError e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
       return e.toString(); 
      } 
     } 
     return "fail"; 
    } 
    } 


public class QuickfixjApplication implements Application { 
    private Map<String,UserSession> users = new HashMap<String,UserSession>(); 

    public void mapUser(String s, UserSession u) { 
     users.put(s, u); 
    } 

    public void toAdmin(Message message, SessionID sessionId) { 

     try { 
      if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) { 
       UserSession user = users.get(sessionId.getSenderCompID()); 
       message.setField(new Username(user.getUsername())); 
       message.setField(new Password(user.getPassword())); 
      } 
     } catch (FieldNotFound e) { 
      e.printStackTrace(); 
     } 
    } 

    public void fromAdmin(Message message, SessionID sessionId) 
     throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon { 

     if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) { 
      System.out.println(message.toString()); 
      UserSession user = users.get(sessionId.getSenderCompID()); 
      user.countdown(message.toString()); 
     } 
    } 
} 

好的,我試過只在這裏包含最少量的代碼。有三個有趣的類,UserSession是Jetty處理程序和QuickFix/j應用程序之間的內部粘連。需要等待異步API回調,然後再從Java中的方法返回

LogonHandler收到HTTP登錄請求並嘗試將用戶登錄到QuickFix/j應用程序會話。

QuickFix/j正在向FIX服務器發送登錄消息,此登錄請求/響應是異步的。 HTTP登錄請求當然是同步的。因此,在我們從HTTP請求返回之前,我們必須等待來自FIX服務器的回覆。我使用CountDownLatch和此UserSession對象執行此操作。

當我創建QuickFix/j會話對象時,我還創建了一個UserSession對象並將它添加到映射中(發生在LogonHandler登錄方法中)。

QuickFix/j應用程序對象toAdmin()和fromAdmin()中有兩個回調。在fromAdmin()中檢查消息是否是登錄響應,如果是,則調用UserSession的方法來倒計時。在調試代碼時,我看到fromAdmin()方法被觸發,在映射中找到UserSession對象,並調用countdown()方法,並且latch.getCount()從1變爲0,但latch.await )方法在UserSession中await()永遠不會返回。它總是超時。

回答

36

您可以使用CountDownLatch這樣的:

public class LogonHandler implements Handler { 
    private final CountDownLatch loginLatch = new CountDownLatch (1); 

    private boolean callbackResults; 

    public void serverResponseCallback(boolean result) { 
     callbackResults = result; 
     loginLatch.countDown(); 
    } 

    public boolean tryLogon(Credentials creds) throws InterruptedException { 
     SomeServer server = new SomeServer(address); 
     server.tryLogon (creds.getName(), creds.getPass()); 
     loginLatch.await(); 
     return callbackResults; 
    } 
} 

如果你想限制由等待時間,例如,5秒,然後代替loginLatch.await()使用以下命令:

if (loginLatch.await (5L, TimeUnit.SECONDS)) 
    return callbackResults; 
else 
    return false; // Timeout exceeded 
+0

感謝米哈伊爾,這就是我的嘗試。 loginLatch.count()變爲0,但await()永遠不會釋放。 – shaz 2013-02-18 05:34:31

+0

@shaz'CountDownLatch'應該可以工作。可能錯誤在代碼的其他位置。你可以在這裏用'CountDownLatch'發佈你的代碼嗎? – 2013-02-18 06:02:59

+0

上面貼的代碼。謝謝。 – shaz 2013-02-18 15:48:57