0

저는 분산 컴퓨팅 세계에 다소 새로운 점이 있습니다. 나는 공식 텐서 흐름 튜토리얼에서 following을 읽었지 만, 튜토리얼의 주요 예에서 무슨 일이 벌어지고 있는지에 대해서는 상당히 혼란 스러웠다.배포 된 Tensorflow 자습서 예제를 설명 할 수 있습니까?

특히 ps 작업과 근로자는 어떻게 상호 작용합니까? ps 작업의 역할은 정확히 무엇입니까? 코드에서 해당 부분이 상당히 제한되어 있으며 많은 작업을 수행하지 않는 것으로 보입니다. 따라서 용도는 무엇입니까? 분산 시스템의 여러 부분이 어떻게 함께 작동하는지 이해하지 못합니다.

다른 프로세스와 동작의 측면에서 셸 명령을 실행할 때 정확히 무슨 일이 일어날 지 설명 할 수 있다면 좋을 것입니다. 여기

import argparse 
import sys 

import tensorflow as tf 

FLAGS = None 

def main(_): 
    ps_hosts = FLAGS.ps_hosts.split(",") 
    worker_hosts = FLAGS.worker_hosts.split(",") 

    # Create a cluster from the parameter server and worker hosts. 
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) 

    # Create and start a server for the local task. 
    server = tf.train.Server(cluster, 
          job_name=FLAGS.job_name, 
          task_index=FLAGS.task_index) 

    if FLAGS.job_name == "ps": 
    server.join() 
    elif FLAGS.job_name == "worker": 

    # Assigns ops to the local worker by default. 
    with tf.device(tf.train.replica_device_setter(
     worker_device="/job:worker/task:%d" % FLAGS.task_index, 
     cluster=cluster)): 

     # Build model... 
     loss = ... 
     global_step = tf.contrib.framework.get_or_create_global_step() 

     train_op = tf.train.AdagradOptimizer(0.01).minimize(
      loss, global_step=global_step) 

    # The StopAtStepHook handles stopping after running given steps. 
    hooks=[tf.train.StopAtStepHook(last_step=1000000)] 

    # The MonitoredTrainingSession takes care of session initialization, 
    # restoring from a checkpoint, saving to a checkpoint, and closing when done 
    # or an error occurs. 
    with tf.train.MonitoredTrainingSession(master=server.target, 
              is_chief=(FLAGS.task_index == 0), 
              checkpoint_dir="/tmp/train_logs", 
              hooks=hooks) as mon_sess: 
     while not mon_sess.should_stop(): 
     # Run a training step asynchronously. 
     # See `tf.train.SyncReplicasOptimizer` for additional details on how to 
     # perform *synchronous* training. 
     # mon_sess.run handles AbortedError in case of preempted PS. 
     mon_sess.run(train_op) 

if __name__ == "__main__": 
    parser = argparse.ArgumentParser() 
    parser.register("type", "bool", lambda v: v.lower() == "true") 
    # Flags for defining the tf.train.ClusterSpec 
    parser.add_argument(
     "--ps_hosts", 
     type=str, 
     default="", 
     help="Comma-separated list of hostname:port pairs" 
) 
    parser.add_argument(
     "--worker_hosts", 
     type=str, 
     default="", 
     help="Comma-separated list of hostname:port pairs" 
) 
    parser.add_argument(
     "--job_name", 
     type=str, 
     default="", 
     help="One of 'ps', 'worker'" 
) 
    # Flags for defining the tf.train.Server 
    parser.add_argument(
     "--task_index", 
     type=int, 
     default=0, 
     help="Index of task within the job" 
) 
    FLAGS, unparsed = parser.parse_known_args() 
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) 

는 쉘 명령입니다 : 여기

참조를위한 주요 코드

내 이해에 따르면, PS 작업이 다른 작업 사이의 모든 공유 데이터를 포함
$ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = ps--task_index = 0# On ps1.example.com: 
    $ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = ps--task_index = 1# On worker0.example.com: 
    $ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = worker--task_index = 0# On worker1.example.com: 
    $ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = worker--task_index = 1 

답변

1

하는 다른 컴퓨터에서 실행될 수 있으며 (모두 동일한 ps 작업을 공유합니다).

+0

값을 업데이트합니다. ps 코드의 작업. 왜 두 개가 있어야합니까? 예를 들어 동일한 코드를 실행했지만 ps 태스크 중 하나를 제거하면 어떻게됩니까? ps 작업이 동기화 용으로 만 존재하는 경우 왜 두 가지가 필요한지 알 수 없습니다. – moxox

2

다음은 상황을 설명하는 다이어그램입니다. 4 가지 텐서 흐름 프로세스가 있습니다. 각 프로세스는 TensorFlow 계산을 실행할 수있는 TensorFlow 작업자 스레드를 실행합니다. 또한 두 프로세스는 session.run 요청을 발행하는 클라이언트 스레드도 실행 중입니다.

각 작업자 프로세스는 장치 위에 분할 그래프 실행의 목적 TensorFlow에서 "장치"이다. 그래프를 작성하는 동안 with tf.device("job:worker/task:0"):과 같은 작업을 수행하여 worker1 장치에서 그래프의 일부를 실행하도록 TF 런타임에 알릴 수 있습니다.

수동으로 with tf.device 주석이 적용되는 tf.train.replica_device_setter에서 마술이 일어나며 장치간에 변수를 자동으로 할당하는 효과가 있습니다. 더 구체적으로 말하면 두 개의 PS 샤드가있을 경우 변수의 절반이 ps1 장치로 이동하고 다른 절반은 ps2 장치로 이동합니다. 한편 그래프의 해당 변수를 업데이트하는 부분은 각 작업자 장치에 복제됩니다. 수동 장치 사양 replica_device_setter를 교체 한 경우

, 당신의 작업자 프로세스는 대략 통신이 자동으로 돌보아이

with tf.device('ps1'): 
    var1 = tf.Variable(...) 
with tf.device('ps2'): 
    var2 = tf.Variable(...) 
with tf.device('worker1'): 
    update_op1 = var1.assign_add(grad1) 
    update_op2 = var2.assign_add(grad2) 

while True: 
    sess.run([update_op1, update_op2]) 

과 같을 것이다. worker1 클라이언트 스레드에서 sess.run(update_op1)을 실행하면 worker1에서 grad1을 계산 한 다음 결과를 ps1 태스크로 보내고 ps1 작업자 스레드를 트리거하여 var1