2016-09-17 183 views
-1

我們正在運行一個大型Python代碼來隨機掃描一些物理模型的參數空間(因此,給出一個簡單的例子非常困難,對不起)。 評估一個參數點大約需要300ms,但有時(我不知道爲什麼)評估突然需要幾個小時,這會殺死計算集羣上的CPU預算。threading.join()如何檢測超時?

所以,我的想法是使用線程來給每個參數點的評估運行的最大時間。如果評估需要更長時間,那麼我可以忽略這一點,因爲它是不實際的。現在,這似乎不起作用。我在一個新的線程中開始計算,將它連接到主線程,超時設置爲例如1秒,但主線程仍然等待計算終止(這要花費大於1秒)。

這怎麼可能?線程如何衡量新線程已經運行的時間? 我不得不說,在評估一個參數點時,我大量使用nlopt,numpy和scipy。正如我所假設的,其中大部分是不直接寫入python,而是用一些二進制文件加速計算。這是否會影響線程(因爲函數是它的「黑盒子」)?

謝謝!

+0

您是否閱讀過[join()']文檔(https://docs.python.org/3.5/library/threading.html#threading.Thread.join)? Quote:*因爲join()總是返回None,所以必須在join()之後調用'is_alive()'來決定是否發生超時** - 如果線程還活着,join ()'call timed out。* – Bakuriu

+0

另外:標準接口不提供任何殺死線程的方法。你最好使用多處理*,這樣更容易殺死。 – Bakuriu

回答

0

簡短的回答:

我不認爲threading.join檢查超時。你必須檢查它是否超時。

在任何情況下得到有效的解決方案,最小的代碼片段會有所幫助。這主要是猜測,但如果主進程沒有檢查超時,那麼它將繼續保持。

較長的答案:

順其自然的timeout參數雲:

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1060

self._wait_for_tstate_lock(timeout=max(timeout, 0)) 

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1062-L1074

def _wait_for_tstate_lock(self, block=True, timeout=-1): 
    # Issue #18808: wait for the thread state to be gone. 
    # At the end of the thread's life, after all knowledge of the thread 
    # is removed from C data structures, C code releases our _tstate_lock. 
    # This method passes its arguments to _tstate_lock.acquire(). 
    # If the lock is acquired, the C code is done, and self._stop() is 
    # called. That sets ._is_stopped to True, and ._tstate_lock to None. 
    lock = self._tstate_lock 
    if lock is None: # already determined that the C code is done 
     assert self._is_stopped 
    elif lock.acquire(block, timeout): 
     lock.release() 
     self._stop() 

如果沒有鎖確保線程停止。 否則通過給定參數blocktimeout獲取鎖。

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L117

def acquire(self, blocking=True, timeout=-1): 
    """Acquire a lock, blocking or non-blocking. 
    When invoked without arguments: if this thread already owns the lock, 
    increment the recursion level by one, and return immediately. Otherwise, 
    if another thread owns the lock, block until the lock is unlocked. Once 
    the lock is unlocked (not owned by any thread), then grab ownership, set 
    the recursion level to one, and return. If more than one thread is 
    blocked waiting until the lock is unlocked, only one at a time will be 
    able to grab ownership of the lock. There is no return value in this 
    case. 
    When invoked with the blocking argument set to true, do the same thing 
    as when called without arguments, and return true. 
    When invoked with the blocking argument set to false, do not block. If a 
    call without an argument would block, return false immediately; 
    otherwise, do the same thing as when called without arguments, and 
    return true. 
    When invoked with the floating-point timeout argument set to a positive 
    value, block for at most the number of seconds specified by timeout 
    and as long as the lock cannot be acquired. Return true if the lock has 
    been acquired, false if the timeout has elapsed. 
    """ 
    me = get_ident() 
    if self._owner == me: 
     self._count += 1 
     return 1 
    rc = self._block.acquire(blocking, timeout) 
    if rc: 
     self._owner = me 
     self._count = 1 
    return rc 

獲取鎖獲取線程身份。 遞增計數。

真的拿到鎖。

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L98

self._block = _allocate_lock() 

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L33

_allocate_lock = _thread.allocate_lock 

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L4

import _thread 

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1300-L1301

static PyMethodDef thread_methods[] = { 
    {"start_new_thread",  (PyCFunction)thread_PyThread_start_new_thread, 
    METH_VARARGS, start_new_doc}, 
    {"start_new",    (PyCFunction)thread_PyThread_start_new_thread, 
    METH_VARARGS, start_new_doc}, 
    {"allocate_lock",   (PyCFunction)thread_PyThread_allocate_lock, 
    METH_NOARGS, allocate_doc}, 
    {"allocate",    (PyCFunction)thread_PyThread_allocate_lock, 
    METH_NOARGS, allocate_doc}, 
    {"exit_thread",    (PyCFunction)thread_PyThread_exit_thread, 
    METH_NOARGS, exit_doc}, 
    {"exit",     (PyCFunction)thread_PyThread_exit_thread, 
    METH_NOARGS, exit_doc}, 
    {"interrupt_main",   (PyCFunction)thread_PyThread_interrupt_main, 
    METH_NOARGS, interrupt_doc}, 
    {"get_ident",    (PyCFunction)thread_get_ident, 
    METH_NOARGS, get_ident_doc}, 
    {"_count",     (PyCFunction)thread__count, 
    METH_NOARGS, _count_doc}, 
    {"stack_size",    (PyCFunction)thread_stack_size, 
    METH_VARARGS, stack_size_doc}, 
    {"_set_sentinel",   (PyCFunction)thread__set_sentinel, 
    METH_NOARGS, _set_sentinel_doc}, 
    {NULL,      NULL}   /* sentinel */ 
}; 

定義allocated_lock方法與類型PyCFunction和名稱thread_PyThread_allocate_lock

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1128-L1131

static PyObject * 
thread_PyThread_allocate_lock(PyObject *self) 
{ 
    return (PyObject *) newlockobject(); 
} 

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L538-L553

static lockobject * 
newlockobject(void) 
{ 
    lockobject *self; 
    self = PyObject_New(lockobject, &Locktype); 
    if (self == NULL) 
     return NULL; 
    self->lock_lock = PyThread_allocate_lock(); 
    self->locked = 0; 
    self->in_weakreflist = NULL; 
    if (self->lock_lock == NULL) { 
     Py_DECREF(self); 
     PyErr_SetString(ThreadError, "can't allocate lock"); 
     return NULL; 
    } 
    return self; 
} 

分配一個新的上下文和鎖定

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L276

PyThread_type_lock 
PyThread_allocate_lock(void) 
{ 
    sem_t *lock; 
    int status, error = 0; 

    dprintf(("PyThread_allocate_lock called\n")); 
    if (!initialized) 
     PyThread_init_thread(); 

    lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t)); 

    if (lock) { 
     status = sem_init(lock,0,1); 
     CHECK_STATUS("sem_init"); 

     if (error) { 
      PyMem_RawFree((void *)lock); 
      lock = NULL; 
     } 
    } 

    dprintf(("PyThread_allocate_lock() -> %p\n", lock)); 
    return (PyThread_type_lock)lock; 
} 

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread.c#L60-L77

void 
PyThread_init_thread(void) 
{ 
#ifdef Py_DEBUG 
    char *p = Py_GETENV("PYTHONTHREADDEBUG"); 

    if (p) { 
     if (*p) 
      thread_debug = atoi(p); 
     else 
      thread_debug = 1; 
    } 
#endif /* Py_DEBUG */ 
    if (initialized) 
     return; 
    initialized = 1; 
    dprintf(("PyThread_init_thread called\n")); 
    PyThread__init_thread(); 
} 

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L170-L176

static void 
PyThread__init_thread(void) 
{ 
#if defined(_AIX) && defined(__GNUC__) 
    extern void pthread_init(void); 
    pthread_init(); 
#endif 
} 

https://github.com/python/cpython/blob/f243de2bc8d940316ce8da778ec02a7bbe594de1/configure.ac#L3416

AC_CHECK_FUNCS(alarm accept4 setitimer getitimer bind_textdomain_codeset chown \ 
clock confstr ctermid dup3 execv faccessat fchmod fchmodat fchown fchownat \ 
fexecve fdopendir fork fpathconf fstatat ftime ftruncate futimesat \ 
futimens futimes gai_strerror getentropy \ 
getgrouplist getgroups getlogin getloadavg getpeername getpgid getpid \ 
getpriority getresuid getresgid getpwent getspnam getspent getsid getwd \ 
if_nameindex \ 
initgroups kill killpg lchmod lchown lockf linkat lstat lutimes mmap \ 
memrchr mbrtowc mkdirat mkfifo \ 
mkfifoat mknod mknodat mktime mremap nice openat pathconf pause pipe2 plock poll \ 
posix_fallocate posix_fadvise pread \ 
pthread_init pthread_kill putenv pwrite readlink readlinkat readv realpath renameat \ 
select sem_open sem_timedwait sem_getvalue sem_unlink sendfile setegid seteuid \ 
setgid sethostname \ 
setlocale setregid setreuid setresuid setresgid setsid setpgid setpgrp setpriority setuid setvbuf \ 
sched_get_priority_max sched_setaffinity sched_setscheduler sched_setparam \ 
sched_rr_get_interval \ 
sigaction sigaltstack siginterrupt sigpending sigrelse \ 
sigtimedwait sigwait sigwaitinfo snprintf strftime strlcpy symlinkat sync \ 
sysconf tcgetpgrp tcsetpgrp tempnam timegm times tmpfile tmpnam tmpnam_r \ 
truncate uname unlinkat unsetenv utimensat utimes waitid waitpid wait3 wait4 \ 
wcscoll wcsftime wcsxfrm wmemcmp writev _getpty) 

http://man7.org/linux/man-pages/man7/pthreads.7.html

所有這些要求兩件事:超時a float?並且你如果isAlive檢查:

當超時參數存在,而不是無,應該指定,以秒爲操作超時浮點數(或其部分)。由於join()總是返回None,所以必須在join()之後調用is_alive()來決定是否發生超時 - 如果線程仍處於活動狀態,則join()調用超時。