0

我目前正在使用免費試用模式的Google雲項目。我有cron工作來從數據供應商獲取數據並將其存儲在數據存儲中。我寫了代碼來獲取幾周前的數據,它都工作正常,但突然間,我開始收到錯誤「DeadlineExceededError:響應HTTP請求的總體截止時間已超過」爲最近兩天。我相信cron工作應該在60分鐘後超時才能知道我爲什麼會出錯。cron job throwing DeadlineExceededError

的cron任務

def run(): 
    try: 
    config = cron.config 
    actual_data_source = config['xxx']['xxxx'] 
    original_data_source = actual_data_source 

    company_list = cron.rest_client.load(config, "companies", '') 

    if not company_list: 
     logging.info("Company list is empty") 
     return "Ok" 

    for row in company_list: 
     company_repository.save(row,original_data_source, actual_data_source) 

    return "OK" 

庫代碼

def save(dto, org_ds , act_dp): 
    try: 
    key = 'FIN/%s' % (dto['ticker']) 
    company = CompanyInfo(id=key) 
    company.stock_code = key 
    company.ticker = dto['ticker'] 
    company.name = dto['name'] 
    company.original_data_source = org_ds 
    company.actual_data_provider = act_dp 
    company.put() 
    return company 
    except Exception: 
    logging.exception("company_repository: error occurred saving the company 
    record ") 
    raise 

RESTClient實現

def load(config, resource, filter): 
    try: 
    username = config['xxxx']['xxxx'] 
    password = config['xxxx']['xxxx'] 
    headers = {"Authorization": "Basic %s" % base64.b64encode(username + ":" 
    + password)} 

    if filter: 
     from_date = filter['from'] 
     to_date = filter['to'] 
     ticker = filter['ticker'] 
     start_date = datetime.strptime(from_date, '%Y%m%d').strftime("%Y-%m-%d") 
     end_date = datetime.strptime(to_date, '%Y%m%d').strftime("%Y-%m-%d") 

    current_page = 1 
    data = [] 

    while True: 

     if (filter): 
     url = config['xxxx']["endpoints"][resource] % (ticker, current_page, start_date, end_date) 
     else: 
     url = config['xxxx']["endpoints"][resource] % (current_page) 

     response = urlfetch.fetch(
      url=url, 
      deadline=60, 
      method=urlfetch.GET, 
      headers=headers, 
      follow_redirects=False, 

     ) 
     if response.status_code != 200: 
      logging.error("xxxx GET received status code %d!" % (response.status_code)) 
      logging.error("error happend for url: %s with headers %s", url, headers) 
      return 'Sorry, xxxx API request failed', 500 

     db = json.loads(response.content) 

     if not db['data']: 
      break 

     data.extend(db['data']) 

     if db['total_pages'] == current_page: 
      break 

     current_page += 1 

    return data 
except Exception: 
    logging.exception("Error occured with xxxx API request") 
    raise 
+0

假設你沒有被阻塞或速率限制,如@momus建議,考慮分派任務來執行保存每個迭代'load'函數中的'while'循環。這樣,在啓動數據存儲庫更新之前,您無需等待'load'完成。你也可以考慮使用'ndb.put_multi'而不是在每個實例上調用'put()'。 – snakecharmerb

+0

相關(是的,我知道這是一個非常不同的問題):https:// stackoverflow。com/questions/45594018/deadlineexceedederror-the-overall-deadline-for-respond-the-http-request -w –

+0

你對服務處理這些cron請求使用什麼樣的縮放比例和什麼樣的實例類型? –

回答

0

我寧願寫這個評論,但我需要更多的聲譽來做到這一點。

  1. 當您通過cron作業直接運行實際的數據提取而不是 時會發生什麼?
  2. 您是否嘗試過測量時間增量從開始到結束 這項工作?
  3. 檢索的公司數量是否顯着增加?
  4. 您似乎在做某種形式的股票報價彙總 - 是否 提供商可能開始阻止您?
1

我有更多的代碼猜測這是因爲這個同樣的問題,但現在: DeadlineExceededError: The overall deadline for responding to the HTTP request was exceeded

我修改你的代碼寫入到每個網址抓取後的數據庫。如果有更多的頁面,那麼它會在延遲的任務中重新啓動,這應該在10分鐘超時之前完成。

延遲任務中的未捕獲異常會導致它重試,因此請注意這一點。

我不清楚actual_data_source & original_data_source是如何工作的,但我認爲你應該可以修改那部分。

crontask

def run(current_page=0): 
    try: 
    config = cron.config 
    actual_data_source = config['xxx']['xxxx'] 
    original_data_source = actual_data_source 

    data, more = cron.rest_client.load(config, "companies", '', current_page) 

    for row in data: 
      company_repository.save(row, original_data_source, actual_data_source) 

    # fetch the rest 
    if more: 
     deferred.defer(run, current_page + 1) 
    except Exception as e: 
    logging.exception("run() experienced an error: %s" % e) 

RESTClient實現

def load(config, resource, filter, current_page): 
    try: 
     username = config['xxxx']['xxxx'] 
     password = config['xxxx']['xxxx'] 
     headers = {"Authorization": "Basic %s" % base64.b64encode(username + ":" 
     + password)} 

     if filter: 
      from_date = filter['from'] 
      to_date = filter['to'] 
      ticker = filter['ticker'] 
      start_date = datetime.strptime(from_date, '%Y%m%d').strftime("%Y-%m-%d") 
      end_date = datetime.strptime(to_date, '%Y%m%d').strftime("%Y-%m-%d") 

      url = config['xxxx']["endpoints"][resource] % (ticker, current_page, start_date, end_date) 
     else: 
      url = config['xxxx']["endpoints"][resource] % (current_page) 

     response = urlfetch.fetch(
       url=url, 
       deadline=60, 
       method=urlfetch.GET, 
       headers=headers, 
       follow_redirects=False, 

     ) 
     if response.status_code != 200: 
       logging.error("xxxx GET received status code %d!" % (response.status_code)) 
       logging.error("error happend for url: %s with headers %s", url, headers) 
       return [], False 

     db = json.loads(response.content) 

     return db['data'], (db['total_pages'] != current_page) 


    except Exception as e: 
     logging.exception("Error occured with xxxx API request: %s" % e) 
     return [], False