顯然,不可能很好地控制coverage
多Threads
。 一旦開始不同的線程,停止Coverage
對象將停止所有覆蓋,start
將只在「開始」線程中重新啓動它。 因此,除了CoverageThread
以外,所有Thread
的代碼基本上都會在2秒後停止覆蓋。
我玩了一下API,可以在不停止Coverage
對象的情況下訪問測量。 因此,您可以使用API啓動一個線程來定期保存覆蓋率數據。 第一個實現將是像在這個
import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file
cov = Coverage(config_file=True)
cov.start()
def get_data_dict(d):
"""Return a dict like d, but with keys modified by `abs_file` and
remove the copied elements from d.
"""
res = {}
keys = list(d.keys())
for k in keys:
a = {}
lines = list(d[k].keys())
for l in lines:
v = d[k].pop(l)
a[l] = v
res[abs_file(k)] = a
return res
class CoverageLoggerThread(threading.Thread):
_kill_now = False
_delay = 2
def __init__(self, main=True):
self.main = main
self._data = CoverageData()
self._fname = cov.config.data_file
self._suffix = None
self._data_files = CoverageDataFiles(basename=self._fname,
warn=cov._warn)
self._pid = os.getpid()
super(CoverageLoggerThread, self).__init__()
def shutdown(self):
self._kill_now = True
def combine(self):
aliases = None
if cov.config.paths:
from coverage.aliases import PathAliases
aliases = PathAliases()
for paths in self.config.paths.values():
result = paths[0]
for pattern in paths[1:]:
aliases.add(pattern, result)
self._data_files.combine_parallel_data(self._data, aliases=aliases)
def export(self, new=True):
cov_report = cov
if new:
cov_report = Coverage(config_file=True)
cov_report.load()
self.combine()
self._data_files.write(self._data)
cov_report.data.update(self._data)
cov_report.html_report(directory="coverage_report_data.html")
cov_report.report(show_missing=True)
def _collect_and_export(self):
new_data = get_data_dict(cov.collector.data)
if cov.collector.branch:
self._data.add_arcs(new_data)
else:
self._data.add_lines(new_data)
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
self._data_files.write(self._data, self._suffix)
if self.main:
self.export()
def run(self):
while True:
sleep(CoverageLoggerThread._delay)
if self._kill_now:
break
self._collect_and_export()
cov.stop()
if not self.main:
self._collect_and_export()
return
self.export(new=False)
print("End of the program. I was killed gracefully :)")
一個更穩定的版本可以在這個GIST找到。 這段代碼基本上抓住了收集器收集的信息而不停止它。 get_data_dict
函數採用Coverage.collector
中的字典並彈出可用數據。這應該是足夠安全的,所以你不會失去任何測量。
報告文件每隔_delay
秒更新一次。
但是,如果您有多個進程正在運行,您需要額外付出努力以確保所有進程都運行CoverageLoggerThread
。這是patch_multiprocessing
函數,從coverage
猴子補丁中修補了猴子...
代碼位於GIST。它基本上用自定義進程替換原始進程,在運行run
方法之前啓動CoverageLoggerThread
,並在進程結束時加入線程。 腳本main.py
允許使用線程和進程啓動不同的測試。
有2/3的缺點,這個代碼,你需要小心的:
這是一個壞主意,同時使用combine
功能,因爲它執行comcurrent讀取/寫入/刪除訪問.coverage.*
文件。這意味着功能export
不是非常安全。它應該沒問題,因爲數據是多次複製的,但在生產中使用它之前我會做一些測試。
數據一旦導出,它就會保留在內存中。所以如果代碼基數很大,它可能會吃掉一些資源。可以轉儲所有數據並重新加載它,但我認爲如果您想每2秒記錄一次,則不會每次都重新加載所有數據。如果延遲幾分鐘,我會每次創建一個新的_data
,使用CoverageData.read_file
重新加載此過程的先前狀態。
定製過程將等待_delay
完成之前,我們在過程結束時加入CoverageThreadLogger
所以,如果你有很多快速的過程,要增加睡眠的粒度能夠檢測該過程的結束更快。它只需要在_kill_now
上打破的自定義睡眠循環。
讓我知道這是否以某種方式幫助你,或者如果有可能改善這個要點。
編輯: 看來你不需要猴子修補多模塊自動啓動記錄器。在你的Python使用.pth
安裝,您可以使用一個環境變量來對新的流程自動啓動記錄器:
# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
import atexit
from coverage_logger import CoverageLoggerThread
thread_cov = CoverageLoggerThread(main=False)
thread_cov.start()
def close_cov()
thread_cov.shutdown()
thread_cov.join()
atexit.register(close_cov)
然後,您可以用COVERAGE_LOGGER_START=1 python main.y
你到底想要測量什麼? 「覆蓋範圍」專指測試。您是否試圖查看代碼中哪些位實際執行? –
你想檢查你的.py文件是否仍然存在嗎? –
是的,代碼庫是巨大的,我們想看看哪些區域的代碼永遠不會到達,如果這些區域存在,刪除它們,或檢查它們爲什麼無法訪問。 –