2016-12-14 38 views
2

我正在使用散景服務器繪製從傳感器開始的在線數據。我寫了一個multiprocessing.Process子類,它從傳感器讀取數據,然後通過管道將其傳送到另一個子類,該子類應使用傳入數據更新散景圖。更新來自多處理過程的散景圖

如何使繪圖Process異步地從管道中讀取並將其繪製到Bokeh?

考慮Grapher類,假設另一個進程已經將數據發送到input_pipe:這裏

from multiprocessing import Process 

from bokeh.client import push_session 
from bokeh.models import ColumnDataSource 
from bokeh.plotting import curdoc, figure 

from functools import partial 

class Grapher(Process): 
    def __init__(self, name, input_pipe): 
    super(Grapher, self).__init__(name=name, daemon=True) 
    self.input_pipe = input_pipe 

    self.doc = curdoc() 
    self.source = ColumnDataSource(dict(time=[], value=[]) 
    self.fig = figure() 
    self.fig.line(source=self.source, x='time', y='value') 
    self.doc.add_root(self.fig) 
    self.session = push_session(self.doc) # To keep session updated. 

    def run(self): 
    while True: 
     time, value = self.input_pipe.recv() 
     self.doc.add_next_tick_callback(partial(self.update, time, value)) 

    @gen.coroutine 
    def update(self, time, value): 
    self.source.stream(dict(time=[time], value=[value])) 

代碼是從背景虛化文檔的example採納。另外我運行bokeh serve,這樣curdoc()有東西可以連接。從Bokeh服務器日誌中,我看到該連接在那裏。

但是,問題似乎是update並沒有真正在while循環的下一個勾號後執行。我可以通過在update內添加日誌消息來檢查該消息,該消息從未打印出來。

如果我嘗試將add_next_tick_callback更改爲update調用,該函數確實運行,但會話不會繪製任何內容。

什麼會導致此問題?代碼看起來合乎邏輯,而且我無法在這種方法可能失敗的情況下使用文檔。

謝謝。

回答

1

當使用bokeh.client方法的應用程序,如果你想的事件進行維修,並且回調發生的時候被調用,然後你必須調用阻塞函數session.loop_until_closed()末。這就是保持監視事件和調用回調的東西,實際上呆在周圍,以便監視事件和調用回調。

如果調用阻塞函數是有問題的,你不希望使用bokeh serve app.py風格的應用程序,那麼你可以嘗試調用session.loop_until_closed()在另一個線程(不是100%,這將工作),或開始自己的龍捲風ioloop並直接捎帶一個Bokeh服務器應用程序。該技術將被證明在即將到來的0.12.4版本更好,但你會發現這款筆記本現在的有用參考:

https://gist.github.com/bryevdv/ff84871fcd843aceea4f0197be9c57e0

注意,筆記本是一個概念證明,你需要一個0.12.4開發版爲它工作,一些使用細節可能會改變。