我已經編寫了一個分佈式TensorFlow程序,其中包含1 ps作業和2個工作作業。我曾預計數據批次將分配給工人,但似乎並非如此。我發現只有一名工作人員(任務= 0)正在做所有的工作,而另一名工作人員閒置。能否請你幫我找到這個程序的問題:數據不在Tensorflow工作人員之間分配
import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("master_hosts", "oser502110:2222",
"Comma-separated list of hostname:port pairs")
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
"Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data",
"Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
tf.app.flags.DEFINE_string("worker_grpc_url", None,
"Worker GRPC URL")
FLAGS = tf.app.flags.FLAGS
IMAGE_PIXELS = 28
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
master_hosts = FLAGS.master_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
is_chief = (FLAGS.task_index == 0)
if is_chief: tf.reset_default_graph()
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Variables of the hidden layer
hid_w = tf.Variable(
tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
stddev=1.0/IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# Variables of the softmax layer
sm_w = tf.Variable(
tf.truncated_normal([FLAGS.hidden_units, 10],
stddev=1.0/math.sqrt(FLAGS.hidden_units)),
name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
global_step = tf.Variable(0, trainable=False)
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
saver = tf.train.Saver()
#summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
# Create a "supervisor", which oversees the training process.
sv = tf.train.Supervisor(is_chief=is_chief,
logdir="/tmp/train_logs",
init_op=init_op,
recovery_wait_secs=1,
saver=saver,
global_step=global_step,
save_model_secs=600)
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index)
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=True,
device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])
# The supervisor takes care of session initialization, restoring from
# a checkpoint, and closing when done or an error occurs.
with sv.prepare_or_wait_for_session(server.target, config=sess_config) as sess:
print("Worker %d: Session initialization complete." % FLAGS.task_index)
# Loop until the supervisor shuts down or 1000000 steps have completed.
step = 0
while not sv.should_stop() and step < 1000000:
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
print("FETCHING NEXT BATCH %d" % FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
_, step = sess.run([train_op, global_step], feed_dict=train_feed)
if step % 100 == 0:
print("Done step %d" % step)
# Ask for all the services to stop.
sv.stop()
if __name__ == "__main__":
tf.app.run()
,這裏是從任務的工人日誌= 0:
2017年6月20日04:50:58.405431:我tensorflow/core/common_runtime/simple_placer.cc:841] Adagrad/value:(Const)/ job:ps/replica:0/task:0/cpu:0 truncated_normal/stddev :(Const):/ job:worker/replica:0 /任務:0/gpu:0
2017-06-20 04:50:58.405456:tensorflow/core/common_runtime/simple_placer.cc:841] truncated_normal/stddev:(Const)/ job:worker/replica:0/task :0/gpu:0 truncated_normal/mean:(Const):/ job:worker/replica:0/task:0/gpu:0
2017-06-20 04:50:58.405481:tensorflow/core/common_runtime/simple_placer.cc:841] truncated_normal/mean:(Const)/ job:worker/replica:0/task:0/gpu:0 truncated_normal /形狀:(Const):/ job:worker/replica:0/task:0/gpu:0
2017-06-20 04:50:58.405506:tensorflow/core/common_runtime/simple_placer.cc:841] truncated_normal /形狀:(Const)/作業:工人/副本:0 /任務:0/gpu:0工作人員0:會話初始化完成。
獲取下一批500
獲取下一批500 獲取下一批500 獲取下一批500 獲取下一批500 完成一步408800 ... ...
但是從工人2(任務= 1)日誌看起來像:
2017-06-20 04:51:07.288600:我tensorflow /核心/ common_runtime/simple_placer.cc:841]零:(常量)/作業:工人/副本:0 /任務:1/gpu:0 Adagrad/value:(Const):/ job:ps/replica:0/task:0/cpu:0
2017-06-20 04:51:07.288614:tensorflow/core/common_runtime/simple_placer.cc:841] Adagrad/value:(Const)/ job:ps/replica:0/task:0/cpu:0 truncated_normal/stddev:(Const):/ job:worker/replica:0/task:1/gpu:0
2017-06-20 04:51:07.288639:tensorflow/core/common_runtime/simple_placer.cc:841] truncated_normal/stddev:(Const)/ job:worker/replica:0/task:1/gpu:0 truncated_normal/mean:(Const):/ job:worker/replica:0/task:1/gpu:0
2017-06 -20:04:51:07.288664:tensorflow/core/common_runtime/simple_placer.cc:841] truncated_normal/mean:(Const)/ job:worker/replica:0/task:1/gpu:0 truncated_normal/shape:(Const ):/ job:worker/replica:0/task:1/gpu:0 2017-06-20 04:51:07.288689:tensorflow/core/common_runtime/simple_placer.cc:841] truncated_normal/shape:(Const)/作業:工人/副本:0 /任務:1/gpu:0
我曾期待兩位工人都有類似的日誌。請幫我理解這一點。期待您的幫助。