2016-03-01 62 views
0

我已經設置了一個基本的EMR 3節點集羣,並使用的--executor-memory設置運行spark-submit,並且沒有其他配置。Spark:使用正確數量的執行程序提交運行的火花

腳本本身是一個基本標杆任務:

from pyspark import SparkConf, SparkContext 
from pyspark.sql import SQLContext, Row 
import time 

conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

# sample data in lineitem table: 
# 3|1284483|34508|3|27|39620.34|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. | 
def mapper(lines): 
    x = lines.split("|") 
    return Row(rownum=int(x[0]), 
     l_orderkey=int(x[0]), 
     l_partkey=int(x[1]), 
     l_suppkey=int(x[2]), 
     l_linenumber=int(x[3]), 
     l_quantity=int(x[4]), 
     l_extendedprice=float(x[5]), 
     l_discount=float(x[6]), 
     l_tax=float(x[7]), 
     l_returnflag=x[8], 
     l_linestatus=x[9], 
     l_shipdate=x[10], 
     l_commitdate=x[11], 
     l_receiptdate=x[12], 
     l_shipinstruct=x[13], 
     l_shipment=x[14], 
     l_comment=x[15], 
    ) 

# ORDERKEY 
# PARTKEY 
# SUPPKEY 
# LINENUMBER 
# QUANTITY 
# EXTENDEDPRICE 
# DISCOUNT 
# TAX 
# RETURNFLAG 
# LINESTATUS 
# SHIPDATE 
# COMMITDATE 
# RECEIPTDATE 
# SHIPINSTRUCT 
# SHIPMODE 
# COMMENT 

rdd = sc.textFile("s3://sampletpchdata/10gb/lineitem.tbl.*") 

# kick off an initial count 
print rdd.count() 

sample = rdd.map(mapper) 

schemaSample = sqlContext.createDataFrame(sample) 
schemaSample.registerTempTable("lineitem") 

# run TPCH query 1 
results = sqlContext.sql(""" 
SELECT 
    l_returnflag, 
    l_linestatus, 
    sum(l_quantity) as sum_qty, 
    sum(l_extendedprice) as sum_base_price, 
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, 
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, 
    avg(l_quantity) as avg_qty, 
    avg(l_extendedprice) as avg_price, 
    avg(l_discount) as avg_disc, 
    count(*) as count_order 
from 
    lineitem 
where 
    l_shipdate <= date_sub(cast('1998-12-01' as date), '60') 
group by 
    l_returnflag, 
    l_linestatus 
order by 
    l_returnflag, 
    l_linestatus 
""") 

# kick off a final count of the results 
print results.count() 

儘管這是怎麼回事,我看着火花API的執行人的結果終點,並得到這樣的結果:

[ { 
    "id" : "driver", 
    "hostPort" : "10.232.13.130:47656", 
    "rddBlocks" : 0, 
    "memoryUsed" : 0, 
    "diskUsed" : 0, 
    "activeTasks" : 0, 
    "failedTasks" : 0, 
    "completedTasks" : 0, 
    "totalTasks" : 0, 
    "totalDuration" : 0, 
    "totalInputBytes" : 0, 
    "totalShuffleRead" : 0, 
    "totalShuffleWrite" : 0, 
    "maxMemory" : 7975010304, 
    "executorLogs" : { } 
}, { 
    "id" : "1", 
    "hostPort" : "ip-10-232-13-123.us-west-1.compute.internal:58544", 
    "rddBlocks" : 0, 
    "memoryUsed" : 0, 
    "diskUsed" : 0, 
    "activeTasks" : 0, 
    "failedTasks" : 0, 
    "completedTasks" : 641, 
    "totalTasks" : 641, 
    "totalDuration" : 4998902, 
    "totalInputBytes" : 3490792, 
    "totalShuffleRead" : 0, 
    "totalShuffleWrite" : 395870, 
    "maxMemory" : 7790985216, 
    "executorLogs" : { 
    "stdout" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stdout?start=-4096", 
    "stderr" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stderr?start=-4096" 
    } 
} ] 

除非我誤解了這個結果,看起來在我的3節點集羣中,只有1個驅動程序和1個執行程序。這是發生了什麼?如果是這樣,那麼不應該有更多的執行者不是這個嗎?我該如何做到這一點?

回答

1

您還必須使用--num-executors來選擇要運行代碼的執行程序的數量。