2016-11-07 63 views
1

我想獲得火花運行的應用程序輸出,並找不到一個直接的方式做到這一點。如何使用默認羣集管理器讓工作人員登錄spark?

基本上我在談論羣集工作者上<spark install dir>/work目錄的內容。

我可以將該目錄複製到我需要的位置,但在100500個節點的情況下,它根本無法縮放。

我正在考慮的另一個選擇是附加一個退出函數(如bash中的TRAP),以從每個工作人員獲取日誌作爲應用程序運行的一部分。我只是覺得必須有比這更好的解決方案。

是的,我知道我可以使用YARN或Mesos集羣管理器來獲取日誌,但對我來說這似乎很奇怪,爲了做這樣一個方便的事情,我不能使用默認的集羣管理器。

非常感謝。

回答

0

在我去了以下解決方案結束(蟒蛇):

import os 
import tarfile 
from io import BytesIO 
from pyspark.sql import SparkSession 


# Get the spark app. 
spark = SparkSession.builder.appName("my-spark-app").getOrCreate() 
# Get the executor working directories. 
spark_home = os.environ.get('SPARK_HOME') 
if spark_home: 
    num_workers = 0 
    with open(os.path.join(spark_home, 'conf', 'slaves'), 'r') as f: 
     for line in f: 
      num_workers += 1 
    if num_workers: 
     executor_logs_path = '/where/to/store/executor_logs' 

     def _map(worker): 
      '''Returns the list of tuples of the name and the tar.gz of the worker log directory in binary format 
      for the corresponding worker. 
      ''' 
      flo = BytesIO() 
      with tarfile.open(fileobj=flo, mode="w:gz") as tar: 
       tar.add(os.path.join(spark_home, 'work'), arcname='work') 
      return [('worker_%d_dir.tar.gz' % worker, flo.getvalue()),] 

     def _reduce(worker1, worker2): 
      '''Appends the worker name and its log tar.gz's into the list. 
      ''' 
      worker1.extend(worker2) 
      return worker1 

     os.makedirs(executor_logs_path) 
     logs = spark.sparkContext.parallelize(range(num_workers), num_workers).map(_map).reduce(_reduce) 
     with tarfile.open(os.path.join(executor_logs_path, 'logs.tar'), 'w') as tar: 
      for name, data in logs: 
       info = tarfile.TarInfo(name=name) 
       info.size=len(data) 
       tar.addfile(tarinfo=info, fileobj=BytesIO(data)) 

一對夫婦的憂慮,但:

  • 不知道,如果使用的map-reduce技術是最好的方式收集日誌
  • 文件(tarball)正在內存中創建,所以根據您的應用程序它可以粉碎如果文件太大
  • 也許有更好的確定工人人數的方法