Переглянути джерело

chore: use fork as the default method for mp backend

AlpinDale 7 місяців тому
батько
коміт
017b42c517

+ 27 - 1
aphrodite/distributed/device_communicators/custom_all_reduce_utils.py

@@ -1,6 +1,9 @@
 import ctypes
 import json
 import os
+import pickle
+import subprocess
+import sys
 from itertools import product
 from typing import Dict, Optional, Sequence
 
@@ -195,7 +198,25 @@ def gpu_p2p_access_check(src: int, tgt: int) -> bool:
         ids = list(range(num_dev))
         # batch of all pairs of GPUs
         batch_src, batch_tgt = zip(*list(product(ids, ids)))
-        result = can_actually_p2p(batch_src, batch_tgt)
+        # NOTE: we use `subprocess` rather than `multiprocessing` here
+        # because the caller might not have `if __name__ == "__main__":`,
+        # in that case we cannot use spawn method in multiprocessing.
+        # However, `can_actually_p2p` requires spawn method.
+        # The fix is, we use `subprocess` to call the function,
+        # where we have `if __name__ == "__main__":` in this file.
+        input_bytes = pickle.dumps((batch_src, batch_tgt))
+        returned = subprocess.run([sys.executable, __file__],
+                                  input=input_bytes,
+                                  capture_output=True)
+        # check if the subprocess is successful
+        try:
+            returned.check_returncode()
+        except Exception as e:
+            # wrap raised exception to provide more information
+            raise RuntimeError(
+                f"Error happened when batch testing "
+                f"peer-to-peer access from {batch_src} to {batch_tgt}") from e
+        result = pickle.loads(returned.stdout)
         for _i, _j, r in zip(batch_src, batch_tgt, result):
             cache[f"{_i}->{_j}"] = r
         with open(path, "w") as f:
@@ -210,3 +231,8 @@ def gpu_p2p_access_check(src: int, tgt: int) -> bool:
 
 
 __all__ = ["gpu_p2p_access_check"]
+
+if __name__ == "__main__":
+    batch_src, batch_tgt = pickle.loads(sys.stdin.buffer.read())
+    result = can_actually_p2p(batch_src, batch_tgt)
+    sys.stdout.buffer.write(pickle.dumps(result))

+ 1 - 1
aphrodite/executor/multiproc_worker_utils.py

@@ -25,7 +25,7 @@ JOIN_TIMEOUT_S = 2
 
 # Use dedicated multiprocess context for workers.
 # Both spawn and fork work
-mp_method = os.getenv("APHRODITE_WORKER_MULTIPROC_METHOD", "spawn")
+mp_method = os.getenv("APHRODITE_WORKER_MULTIPROC_METHOD", "fork")
 mp = multiprocessing.get_context(mp_method)