2017-10-17 141 views
4

我正在嘗試建立一個長時間運行的Pull訂閱Google雲PubSub主題。 我使用非常相似的文檔here,即在給出的示例代碼:Google PubSub python客戶端返回StatusCode.UNAVAILABLE

def receive_messages(project, subscription_name): 
    """Receives messages from a pull subscription.""" 
    subscriber = pubsub_v1.SubscriberClient() 
    subscription_path = subscriber.subscription_path(
     project, subscription_name) 

    def callback(message): 
     print('Received message: {}'.format(message)) 
     message.ack() 

    subscriber.subscribe(subscription_path, callback=callback) 

    # The subscriber is non-blocking, so we must keep the main thread from 
    # exiting to allow it to process messages in the background. 
    print('Listening for messages on {}'.format(subscription_path)) 
    while True: 
     time.sleep(60) 

的問題是,我收到有時以下回溯:

Exception in thread Consumer helper: consume bidirectional stream: 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner 
    self.run() 
    File "/usr/lib/python3.5/threading.py", line 862, in run 
    self._target(*self._args, **self._kwargs) 
    File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume 
    self._policy.on_exception(exc) 
    File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception 
    raise exception 
    File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume 
    for response in response_generator: 
    File "/path/to/grpc/_channel.py", line 348, in __next__ 
    return self._next() 
    File "/path/to/grpc/_channel.py", line 342, in _next 
    raise self 
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])> 

我看到這個在another question中被引用,但在這裏我要求如何在Python中正確處理它。我試圖在異常中包裝請求,但它似乎在後臺運行,並且在出現該錯誤時我無法重試。

回答

5

對我來說有點冒險的方法是自定義policy_class。默認的有on_exception函數,忽略DEADLINE_EXCEEDED。您可以創建一個繼承默認值的類,並且也會忽略UNAVAILABLE。我的是這樣:

from google.cloud import pubsub 
from google.cloud.pubsub_v1.subscriber.policy import thread 
import grpc 

class AvailablePolicy(thread.Policy): 
    def on_exception(self, exception): 
     """The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE. 

     I'm not sure what triggers that error, but if you ignore it, your 
     subscriber seems to work just fine. It's probably an intermittent 
     thing and it reconnects later if you just give it a chance. 
     """ 
     # If this is UNAVAILABLE, then we want to retry. 
     # That entails just returning None. 
     unavailable = grpc.StatusCode.UNAVAILABLE 
     if getattr(exception, 'code', lambda: None)() == unavailable: 
      return 
     # For anything else, fallback on super. 
     super(AvailablePolicy, self).on_exception(exception) 

subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy) 
# Continue to set up as normal. 

它看起來很像originalon_exception會忽略不同的錯誤。如果你願意,每當引發異常時你都可以添加一些日誌記錄,並驗證一切仍然有效。未來的消息仍然會通過。

+0

FWIW,大約一個小時後出現[cpu問題](https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3965)。我認爲這個問題在每次忽略錯誤時都是線程泄漏(其他人在DEADLINE_EXCEEDED之後得到它),但是我找不到修復程序。我回到使用舊的api,並做我自己的定期拉,而不是使用流的東西。 [這個例子](https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/blob/master/cmdline-pull/pubsub_sample.py)對設置起來很有用。 –

+0

感謝您的回覆......對於那些應該非常簡單的東西來說,這看起來太過於誇張了。對於手頭的任務,我最終使用了'golang'客戶端庫,它的功能如同魅力 – adrpino

+0

[github上的相關問題](https://github.com/GoogleCloudPlatform/google-cloud-python/issues/2683) – Blackus

相關問題