docs/source/rpc/rref.rst
:orphan:
.. _remote-reference-protocol:
This note describes the design details of Remote Reference protocol and walks
through message flows in different scenarios. Make sure you're familiar with the
:ref:distributed-rpc-framework before proceeding.
Background ^^^^^^^^^^
RRef stands for Remote REFerence. It is a reference of an object which is
located on the local or remote worker, and transparently handles reference
counting under the hood. Conceptually, it can be considered as a distributed
shared pointer. Applications can create an RRef by calling
:meth:~torch.distributed.rpc.remote. Each RRef is owned by the callee worker
of the :meth:~torch.distributed.rpc.remote call (i.e., owner) and can be used
by multiple users. The owner stores the real data and keeps track of the global
reference count. Every RRef can be uniquely identified by a global RRefId,
which is assigned at the time of creation on the caller of the
:meth:~torch.distributed.rpc.remote call.
On the owner worker, there is only one OwnerRRef instance, which contains
the real data, while on user workers, there can be as many UserRRefs as
necessary, and UserRRef does not hold the data. All usage on the owner will
retrieve the unique OwnerRRef instance using the globally unique RRefId.
A UserRRef will be created when it is used as an argument or return value in
:meth:~torch.distributed.rpc.rpc_sync,
:meth:~torch.distributed.rpc.rpc_async or
:meth:~torch.distributed.rpc.remote invocation, and the owner will be notified
according to update the reference count. An OwnerRRef and its data will be
deleted when there is no UserRRef instances globally and there are no
reference to the OwnerRRef on the owner as well.
Assumptions ^^^^^^^^^^^
RRef protocol is designed with the following assumptions.
~torch.distributed.rpc.rpc_sync,
:meth:~torch.distributed.rpc.rpc_async or
:meth:~torch.distributed.rpc.remote are not idempotent and therefore
cannot be retried. However, internal RRef control messages are idempotent and
retried upon message failure.RRef Lifetime ^^^^^^^^^^^^^
The goal of the protocol is to delete an OwnerRRef at an appropriate time.
The right time to delete an OwnerRRef is when there are no living
UserRRef instances and user code is not holding references to the
OwnerRRef either. The tricky part is to determine if there are any living
UserRRef instances.
A user can get a UserRRef in three situations:
UserRRef from the owner.UserRRef from another user.UserRRef owned by another worker.Case 1 is the simplest where the owner passes its RRef to a user, where the
owner calls :meth:~torch.distributed.rpc.rpc_sync,
:meth:~torch.distributed.rpc.rpc_async, or
:meth:~torch.distributed.rpc.remote and uses its RRef as an argument. In this
case a new UserRRef will be created on the user. As the owner is the caller,
it can easily update its local reference count on the OwnerRRef.
The only requirement is that any
UserRRef must notify the owner upon destruction. Hence, we need the first
guarantee:
G1. The owner will be notified when any UserRRef is deleted.
As messages might come delayed or out-of-order, we need one more guarantee to make sure the delete message is not processed too soon. If A sends a message to B that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B (the child RRef).
G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the owner.
In cases 2 and 3, it is possible that the owner has only partial or no knowledge
at all about the RRef fork graph. For example, an RRef could be
constructed on a user, and before the owner receives any RPC call, the
creator user might have already shared the RRef with other users, and those
users could further share the RRef. One invariant is that the fork graph of
any RRef is always a tree, because forking an RRef always
creates a new UserRRef instance on the callee (except if the callee is the
owner), and hence every RRef has a single parent.
The owner's view on any UserRRef in the tree has three stages:
.. code::
The owner's view of the entire tree keeps changing. The owner deletes its
OwnerRRef instance when it thinks there are no living UserRRef
instances, i.e.,
when OwnerRRef is deleted, all UserRRef instances could be either indeed
deleted or unknown. The dangerous case is when some forks are unknown and others
are deleted.
G2 trivially guarantees that no parent UserRRef can be deleted before
the owner knows all of its children UserRRef instances. However, it is
possible that the child UserRRef may be deleted before the owner knows its
parent UserRRef.
Consider the following example, where the OwnerRRef forks to A, then A forks
to Y, and Y forks to Z:
.. code::
OwnerRRef -> A -> Y -> Z
If all of Z's messages, including the delete message, are processed by the
owner before Y's messages. the owner will learn of Z's deletion before
knowing Y exists. Nevertheless, this does not cause any problem. Because, at least
one of Y's ancestors will be alive (A) and it will
prevent the owner from deleting the OwnerRRef. More specifically, if the
owner does not know Y, A cannot be deleted due to G2, and the owner knows A
since it is A's parent.
Things get a little trickier if the RRef is created on a user:
.. code::
OwnerRRef ^ | A -> Y -> Z
If Z calls :meth:~torch.distributed.rpc.RRef.to_here on the UserRRef, the
owner at least knows A when Z is deleted, because otherwise,
:meth:~torch.distributed.rpc.RRef.to_here wouldn't finish. If Z does not call
:meth:~torch.distributed.rpc.RRef.to_here, it is possible that the owner
receives all messages from Z before any message from A and Y. In this case, as
the real data of the OwnerRRef has not been created yet, there is nothing to
be deleted either. It is the same as Z does not exist at all. Hence, it's still
OK.
G1 is implemented by sending out a delete message in UserRRef
destructor. To provide G2, the parent UserRRef is put into a context
whenever it is forked, indexed by the new ForkId. The parent UserRRef is
only removed from the context when it receives an acknowledgement message (ACK)
from the child, and the child will only send out the ACK when it is confirmed by
the owner.
Protocol Scenarios ^^^^^^^^^^^^^^^^^^
Let's now discuss how the above designs translate to the protocol in four scenarios.
.. code::
import torch import torch.distributed.rpc as rpc
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
rref.to_here()
In this case, the UserRRef is created on the user worker A, then it is
passed to the owner worker B together with the remote message, and then B
creates the OwnerRRef. The method :meth:~torch.distributed.rpc.remote
returns immediately, meaning that the UserRRef can be forked/used before
the owner knows about it.
On the owner, when receiving the :meth:~torch.distributed.rpc.remote call, it
will create the OwnerRRef, and returns an ACK to acknowledge {100, 1}
(RRefId, ForkId). Only after receiving this ACK, can A delete its
UserRRef. This involves both G1 and G2. G1 is obvious. For
G2, the OwnerRRef is a child of the UserRRef, and the UserRRef
is not deleted until it receives the ACK from the owner.
.. image:: https://user-images.githubusercontent.com/16999635/69164772-98181300-0abe-11ea-93a7-9ad9f757cd94.png :alt: user_to_owner_ret.png :width: 500 px
The diagram above shows the message flow, where solid arrow contains user
function and dashed arrow are builtin messages. Note that the first two messages
from A to B (:meth:~torch.distributed.rpc.remote and
:meth:~torch.distributed.rpc.RRef.to_here) may
arrive at B in any order, but the final delete message will only be sent out
when:
UserRRef {100, 1} (G2), andUserRRef instance. This occurs when
the RRef is no longer in scope and is eligible for garbage collection... code::
import torch import torch.distributed.rpc as rpc
def func(rref): pass
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
rpc.rpc_async('B', func, args=(rref, ))
In this case, after creating the UserRRef on A, A uses it as an argument in
a followup RPC call to B. A will keep UserRRef {100, 1} alive until it
receives the acknowledge from B (G2, not the return value of the RPC call).
This is necessary because A should not send out the delete message until all
previous messages are received, otherwise, the OwnerRRef could be
deleted before usage as we do not guarantee message delivery order. This is done
by creating a child ForkId of RRef, holding them in a map until receives the
owner confirms the child ForkId. The figure below shows the message flow.
.. image:: https://user-images.githubusercontent.com/16999635/69164845-b67e0e80-0abe-11ea-93fa-d24674e75a2b.png :alt: user_to_owner_arg.png :width: 500 px
Note that the UserRRef could be deleted on B before func finishes or even
starts. However this is OK, as at the time B sends out ACK for the child
ForkId, it already acquired the OwnerRRef instance, which would prevent
it been deleted too soon.
Owner to user is the simplest case, where the owner can update reference counting locally, and does not need any additional control message to notify others. Regarding G2, it is same as the parent receives the ACK from the owner immediately, as the parent is the owner.
.. code::
import torch import torch.distributed.rpc as RRef, rpc
def func(rref): pass
rref = RRef("data")
dist.rpc_async('C', func, args=(rref, ))
.. image:: https://user-images.githubusercontent.com/16999635/69164921-c990de80-0abe-11ea-9250-d32ad00cf4ae.png :alt: owner_to_user.png :width: 500 px
The figure above shows the message flow. Note that when the OwnerRRef exits
scope after the rpc_async call, it will not be deleted, because internally
there is a map to hold it alive if there is any known forks, in which case is
UserRRef {100, 1}. (G2)
This is the most complicated case where caller user (parent UserRRef),
callee user (child UserRRef), and the owner all need to get involved.
.. code::
import torch import torch.distributed.rpc as rpc
def func(rref): pass
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
rpc.rpc_async('C', func, args=(rref, ))
.. image:: https://user-images.githubusercontent.com/16999635/69164971-d6adcd80-0abe-11ea-971d-6b7af131f0fd.png :alt: user_to_user.png :width: 500 px
When C receives the child UserRRef from A, it sends out a fork request to
the owner B. Later, when the B confirms the UserRRef on C, C will perform
two actions in parallel: 1) send out the child ACK to A ,and 2) run the user
provided function. During this time, the parent (A) will hold its
UserRRef {100, 1} alive to achieve G2.