2016-09-14 57 views
15

我有一個多進程的Web服務器,其進程永遠不會結束,我想檢查我的代碼覆蓋整個項目在實時環境(不僅來自測試)。python在永無止境的過程中運行覆蓋

問題是,由於過程沒有結束,我沒有一個好的地方來設置cov.start() cov.stop() cov.save()掛鉤。

因此,我想過產生一個線程,在無限循環中將保存併合並覆蓋率數據,然後睡眠一段時間,但是這種方法不起作用,覆蓋報告似乎是空的,除了睡眠線。

我很樂意收到關於如何獲得我的代碼覆蓋範圍的任何建議, 或任何有關爲什麼我的想法不起作用的建議。這裏是我的代碼片段:

import coverage 
cov = coverage.Coverage() 
import time 
import threading 
import os 

class CoverageThread(threading.Thread): 
    _kill_now = False 
    _sleep_time = 2 

@classmethod 
def exit_gracefully(cls): 
    cls._kill_now = True 

def sleep_some_time(self): 
    time.sleep(CoverageThread._sleep_time) 

def run(self): 
    while True: 
     cov.start() 
     self.sleep_some_time() 
     cov.stop() 
     if os.path.exists('.coverage'): 
      cov.combine() 
     cov.save() 
     if self._kill_now: 
      break 
    cov.stop() 
    if os.path.exists('.coverage'): 
     cov.combine() 
    cov.save() 
    cov.html_report(directory="coverage_report_data.html") 
    print "End of the program. I was killed gracefully :)" 
+1

你到底想要測量什麼? 「覆蓋範圍」專指測試。您是否試圖查看代碼中哪些位實際執行? –

+0

你想檢查你的.py文件是否仍然存在嗎? –

+1

是的,代碼庫是巨大的,我們想看看哪些區域的代碼永遠不會到達,如果這些區域存在,刪除它們,或檢查它們爲什麼無法訪問。 –

回答

8

顯然,不可能很好地控制coverageThreads。 一旦開始不同的線程,停止Coverage對象將停止所有覆蓋,start將只在「開始」線程中重新啓動它。 因此,除了CoverageThread以外,所有Thread的代碼基本上都會在2秒後停止覆蓋。

我玩了一下API,可以在不停止Coverage對象的情況下訪問測量。 因此,您可以使用API​​啓動一個線程來定期保存覆蓋率數據。 第一個實現將是像在這個

import threading 
from time import sleep 
from coverage import Coverage 
from coverage.data import CoverageData, CoverageDataFiles 
from coverage.files import abs_file 

cov = Coverage(config_file=True) 
cov.start() 


def get_data_dict(d): 
    """Return a dict like d, but with keys modified by `abs_file` and 
    remove the copied elements from d. 
    """ 
    res = {} 
    keys = list(d.keys()) 
    for k in keys: 
     a = {} 
     lines = list(d[k].keys()) 
     for l in lines: 
      v = d[k].pop(l) 
      a[l] = v 
     res[abs_file(k)] = a 
    return res 


class CoverageLoggerThread(threading.Thread): 
    _kill_now = False 
    _delay = 2 

    def __init__(self, main=True): 
     self.main = main 
     self._data = CoverageData() 
     self._fname = cov.config.data_file 
     self._suffix = None 
     self._data_files = CoverageDataFiles(basename=self._fname, 
              warn=cov._warn) 
     self._pid = os.getpid() 
     super(CoverageLoggerThread, self).__init__() 

    def shutdown(self): 
     self._kill_now = True 

    def combine(self): 
     aliases = None 
     if cov.config.paths: 
      from coverage.aliases import PathAliases 
      aliases = PathAliases() 
      for paths in self.config.paths.values(): 
       result = paths[0] 
       for pattern in paths[1:]: 
        aliases.add(pattern, result) 

     self._data_files.combine_parallel_data(self._data, aliases=aliases) 

    def export(self, new=True): 
     cov_report = cov 
     if new: 
      cov_report = Coverage(config_file=True) 
      cov_report.load() 
     self.combine() 
     self._data_files.write(self._data) 
     cov_report.data.update(self._data) 
     cov_report.html_report(directory="coverage_report_data.html") 
     cov_report.report(show_missing=True) 

    def _collect_and_export(self): 
     new_data = get_data_dict(cov.collector.data) 
     if cov.collector.branch: 
      self._data.add_arcs(new_data) 
     else: 
      self._data.add_lines(new_data) 
     self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers)) 
     self._data_files.write(self._data, self._suffix) 

     if self.main: 
      self.export() 

    def run(self): 
     while True: 
      sleep(CoverageLoggerThread._delay) 
      if self._kill_now: 
       break 

      self._collect_and_export() 

     cov.stop() 

     if not self.main: 
      self._collect_and_export() 
      return 

     self.export(new=False) 
     print("End of the program. I was killed gracefully :)") 

一個更穩定的版本可以在這個GIST找到。 這段代碼基本上抓住了收集器收集的信息而不停止它。 get_data_dict函數採用Coverage.collector中的字典並彈出可用數據。這應該是足夠安全的,所以你不會失去任何測量。
報告文件每隔_delay秒更新一次。

但是,如果您有多個進程正在運行,您需要額外付出努力以確保所有進程都運行CoverageLoggerThread。這是patch_multiprocessing函數,從coverage猴子補丁中修補了猴子...
代碼位於GIST。它基本上用自定義進程替換原始進程,在運行run方法之前啓動CoverageLoggerThread,並在進程結束時加入線程。 腳本main.py允許使用線程和進程啓動不同的測試。

有2/3的缺點,這個代碼,你需要小心的:

  • 這是一個壞主意,同時使用combine功能,因爲它執行comcurrent讀取/寫入/刪除訪問.coverage.*文件。這意味着功能export不是非常安全。它應該沒問題,因爲數據是多次複製的,但在生產中使用它之前我會做一些測試。

  • 數據一旦導出,它就會保留在內存中。所以如果代碼基數很大,它可能會吃掉一些資源。可以轉儲所有數據並重新加載它,但我認爲如果您想每2秒記錄一次,則不會每次都重新加載所有數據。如果延遲幾分鐘,我會每次創建一個新的_data,使用CoverageData.read_file重新加載此過程的先前狀態。

  • 定製過程將等待_delay完成之前,我們在過程結束時加入CoverageThreadLogger所以,如果你有很多快速的過程,要增加睡眠的粒度能夠檢測該過程的結束更快。它只需要在_kill_now上打破的自定義睡眠循環。

讓我知道這是否以某種方式幫助你,或者如果有可能改善這個要點。


編輯: 看來你不需要猴子修補多模塊自動啓動記錄器。在你的Python使用.pth安裝,您可以使用一個環境變量來對新的流程自動啓動記錄器:

# Content of coverage.pth in your site-package folder 
import os 
if "COVERAGE_LOGGER_START" in os.environ: 
    import atexit 
    from coverage_logger import CoverageLoggerThread 
    thread_cov = CoverageLoggerThread(main=False) 
    thread_cov.start() 
    def close_cov() 
     thread_cov.shutdown() 
     thread_cov.join() 
    atexit.register(close_cov) 

然後,您可以用COVERAGE_LOGGER_START=1 python main.y

+0

感謝您提出這個問題。它在覆蓋範圍內使用了許多非公共接口。py代碼庫。 coverage.py提供的公共API可以使這更加可支持? –

+0

另外,爲什麼要組合?爲什麼不將每個快照寫入單獨的文件,然後再組合它們? –

+0

覆蓋範圍可以提出一個導出功能,將當前數據保存在文件中,並且不會停止「Coverage」對象。如果您拍攝「收集器」數據(基本上是我在此實施的)的快照,那麼做到這一點非常簡單。我在此代碼中調用了合併,因爲OP正在這樣做。它使得它更加複雜,但是如果跨越多個進程,則保持文件數量低。此外,代碼傾向於重寫文件,所以儘早使用「combine」避免信息丟失。這取決於您隨着時間的推移和管理的流程數量。 –

3

既然你願意爲不同的測試運行您的代碼,爲什麼不加入的方式結束了測試的過程?這似乎比試圖破解覆蓋更簡單。

+0

因爲應該以無縫的方式將數據收集到最終用戶,所以想法是收集來自公司周圍的多個用戶的數據併合並他們的結果,同時允許他們工作以一種常規的方式。此外,如果我會強制重新啓動或停止進程,這會影響會損害結果的行爲。 –

1

啓動覆蓋記錄儀可以直接使用pyrasite,與遵循兩個程序。

# start.py 
import sys 
import coverage 

sys.cov = cov = coverage.coverage() 
cov.start() 

而這一次

# stop.py 
import sys 

sys.cov.stop() 
sys.cov.save() 
sys.cov.html_report() 

另一種方式去將跟蹤使用lptrace即使它只打印調用它可能是有用的程序。