我正在嘗試開發一個使用pika和線程模塊的Python 3.6腳本。Python - 在獨立線程中運行的函數之間傳遞函數(回調)變量
我有一個問題,我認爲是由我的A)是非常新的Python和編碼一般,和B)我不理解如何在函數之間傳遞變量時,他們在單獨的線程中運行,並已通過接收函數名稱末尾的括號中的參數。
我認爲這是因爲當我不使用線程時,我可以簡單地通過調用接收函數名稱並提供要傳遞的變量,在括號中顯示一個基本示例如下:
def send_variable():
body = "this is a text string"
receive_variable(body)
def receive_variable(body):
print(body)
這在運行時,打印:
this is a text string
代碼的工作版本,我需要得到具有線程工作如下所示 - 本採用直板功能(無螺紋)和我我使用pika接收來自(RabbitMQ)q的消息ueue通過鼠兔回調函數,然後我在「回調」功能接收消息的身體傳遞給「處理功能」:
import pika
...mq connection variables set here...
# defines username and password credentials as variables set at the top of this script
credentials = pika.PlainCredentials(mq_user_name, mq_pass_word)
# defines mq server host, port and user credentials and creates a connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
# creates a channel connection instance using the above settings
channel = connection.channel()
# defines the queue name to be used with the above channel connection instance
channel.queue_declare(queue=mq_queue)
def callback(ch, method, properties, body):
# passes (body) to processing function
body_processing(body)
# sets channel consume type, also sets queue name/message acknowledge settings based on variables set at top of script
channel.basic_consume(callback, queue=mq_queue, no_ack=mq_no_ack)
# tells the callback function to start consuming
channel.start_consuming()
# calls the callback function to start receiving messages from mq server
callback()
# above deals with pika connection and the main callback function
def body_processing(body):
...code to send a pika message every time a 'body' message is received...
這工作得很好,但是我希望把這種在腳本中運行使用線程。當我這樣做時,我必須提供參數'channel'給在其自己的線程中運行的函數名 - 當我然後嘗試包含'body'參數以便'processing_function'看起來如下:
def processing_function(channel, body):
我得到一個錯誤說:
[function_name] is missing 1 positional argument: 'body'
我知道,使用線程時,有需要更多的代碼,我已經列入我用下面線程的實際代碼,這樣你可以看到我在做什麼:
...imports and mq variables and pika connection details are set here...
def get_heartbeats(channel):
channel.queue_declare(queue=queue1)
#print (' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
process_body(body)
#print (" Received %s" % (body))
channel.basic_consume(callback, queue=queue1, no_ack=no_ack)
channel.start_consuming()
def process_body(channel, body):
channel.queue_declare(queue=queue2)
#print (' [*] Waiting for Tick messages. To exit press CTRL+C')
# sets the mq host which pika client will use to send a message to
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host))
# create a channel connection instance
channel = connection.channel()
# declare a queue to be used by the channel connection instance
channel.queue_declare(queue=order_send_queue)
# send a message via the above channel connection settings
channel.basic_publish(exchange='', routing_key=send_queue, body='Test Message')
# send a message via the above channel settings
# close the channel connection instance
connection.close()
def manager():
# Channel 1 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection1 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel1 = connection1.channel()
# Channel 1 thread =====================================================================================================
t1 = threading.Thread(target=get_heartbeats, args=(channel1,))
t1.daemon = True
threads.append(t1)
# as this is thread 1 call to start threading is made at start threading section
# Channel 2 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection2 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel2 = connection2.channel()
# Channel 2 thread ====================================================================================================
t2 = threading.Thread(target=process_body, args=(channel2, body))
t2.daemon = True
threads.append(t2)
t2.start() # as this is thread 2 - we need to start the thread here
# Start threading
t1.start() # start the first thread - other threads will self start as they call t1.start() in their code block
for t in threads: # for all the threads defined
t.join() # join defined threads
manager() # run the manager module which starts threads that call each module
這在運行時產生錯誤
process_body() missing 1 required positional argument: (body)
,我不明白這是爲什麼或如何解決它。
感謝您花時間閱讀此問題,並且您可以提供任何幫助或建議,我們對此表示讚賞。
請記住,我是python和編碼的新手,所以可能需要拼寫出來的東西,而不是能夠理解更加神祕的回覆。
謝謝!