2017-08-14 64 views
2

我有pyspark腳本工作正常。該腳本將從mysql獲取數據並在HDFS中創建配置單元表。使用shell腳本在Python中收集函數的日誌

pyspark腳本如下。現在

#!/usr/bin/env python 
import sys 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import HiveContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

#Condition to specify exact number of arguments in the spark-submit command line 
if len(sys.argv) != 8: 
    print "Invalid number of args......" 
    print "Usage: spark-submit import.py Arguments" 
    exit() 
table = sys.argv[1] 
hivedb = sys.argv[2] 
domain = sys.argv[3] 
port=sys.argv[4] 
mysqldb=sys.argv[5] 
username=sys.argv[6] 
password=sys.argv[7] 

df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() 

#Register dataframe as table 
df.registerTempTable("mytempTable") 

# create hive table from temp table: 
sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table)) 

sc.stop() 

pyspark腳本將通過使用shell腳本調用。對於這個shell腳本,我將表名作爲文件的參數傳遞。

shell script在下面。

#!/bin/bash 

source /home/$USER/spark/source.sh 
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; } 

args_file=$1 

TIMESTAMP=`date "+%Y-%m-%d"` 
touch /home/$USER/logs/${TIMESTAMP}.success_log 
touch /home/$USER/logs/${TIMESTAMP}.fail_log 
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log 
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log 

#Function to get the status of the job creation 
function log_status 
{ 
     status=$1 
     message=$2 
     if [ "$status" -ne 0 ]; then 
       echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}" 
       #echo "Please find the attached log file for more details" 
       exit 1 
       else 
        echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}" 
       fi 
} 
while read -r table ;do 
    spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1 
    g_STATUS=$? 
    log_status $g_STATUS "Spark job ${table} Execution" 
done < "${args_file}" 

echo "************************************************************************************************************************************************************************" 

我能夠使用上面的shell腳本爲args_file中的每個單獨表收集日誌。

現在我在mysql中有超過200個表。我修改了下面的pyspark腳本。我已經創建了一個函數來通過args_file並執行代碼。

New spark script

#!/usr/bin/env python 
import sys 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import HiveContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

#Condition to specify exact number of arguments in the spark-submit command line 
if len(sys.argv) != 8: 
    print "Invalid number of args......" 
    print "Usage: spark-submit import.py Arguments" 
    exit() 
args_file = sys.argv[1] 
hivedb = sys.argv[2] 
domain = sys.argv[3] 
port=sys.argv[4] 
mysqldb=sys.argv[5] 
username=sys.argv[6] 
password=sys.argv[7] 

def testing(table, hivedb, domain, port, mysqldb, username, password): 

    print "*********************************************************table = {} ***************************".format(table) 
    df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() 

    #Register dataframe as table 
    df.registerTempTable("mytempTable") 

    # create hive table from temp table: 
    sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table)) 

input = sc.textFile('/user/XXXXXXX/spark_args/%s' %args_file).collect() 

for table in input: 
testing(table, hivedb, domain, port, mysqldb, username, password) 

sc.stop() 

現在我想收集各個表的日誌中args_file。但是我得到的只有一個日誌文件,其中包含所有表的日誌。

我該如何達到我的要求?或者是我做的方法是完全錯誤的

新的shell腳本:

spark-submit --name "${args_file}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${args_file}.log 2>&1 
+0

你還在使用bash腳本來調用python spark嗎? – sal

+0

@sal yes我仍在使用相同的shell腳本 –

+0

@sal請讓我知道如果我需要以不同的方式執行操作 –

回答

1

你可以做的是寫一個python腳本,將採取單一的日誌文件,並刪日誌文件一行前printstable名稱。

例如:

*************************************table=table1*************** 

然後下一個日誌文件從

*************************************table=table2**************** 

等開始。你也可以把表名作爲你的文件名