123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import subprocess
- import os
- from apex.transformer.testing.commons import TEST_SUCCESS_MESSAGE
- def run_gpt(cmd):
- args = list(cmd.split(" "))
- p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- outs, errs = p.communicate()
- outs = list(str((outs).decode("utf-8")).splitlines())
- success = False
- runtime = 0
- num_params = 0
- for out in outs:
- out = str(out)
- if "Average Iteration Time:" in str(out):
- slicey = out[out.find(":") + 2 :]
- try:
- runtime = float(slicey)
- except:
- print(slicey)
- quit()
- if "Number of Parameters:" in str(out):
- slicey = out[out.find(":") + 2 :]
- try:
- num_params = int(slicey)
- except:
- print(slicey)
- quit()
- if str(out) == str(TEST_SUCCESS_MESSAGE):
- success = True
- return runtime, round(float(int(num_params)) / 10.0 ** 9, 3), success, errs
- def plot(runtimes):
- import matplotlib.pyplot as plt
- for distributed_setting in runtimes.keys():
- plt.scatter(
- runtimes[distributed_setting].keys(),
- runtimes[distributed_setting].values(),
- label=distributed_setting,
- )
- plt.legend()
- plt.xlabel("Parameters (Billions)")
- plt.ylabel("Training Iteration time (s)")
- plt.title(str("GPT Scaling w/ Offloading"))
- plt.savefig("offload_gpt_scaling.png")
- plt.close()
- if not os.path.exists("/my_workspace/"):
- os.system("mkdir /my_workspace/")
- os.system("cp *.png /my_workspace/")
- def main():
- runtimes = {}
- nlist = (
- list(range(2000, 10000, 2000))
- + list(range(10000, 50000, 5000))
- + list(range(50000, 100000, 10000))
- )
- print("N-List:", nlist)
- for data_parr, tens_parr, pipe_parr in [(8, 1, 1), (4, 2, 1), (2, 1, 4), (1, 2, 4)]:
- for offload in [True, False]:
- dist_setting = (
- "ddp="
- + str(data_parr)
- + ", tensor_parr="
- + str(tens_parr)
- + ", pipe_parr="
- + str(pipe_parr)
- + ", offload="
- + str(offload)
- )
- runtimes[dist_setting] = {}
- print("Beginning Testing for", dist_setting)
- for n in nlist:
- cmd = "python3 -m torch.distributed.launch --nproc_per_node=8 run_gpt_minimal_test.py"
- cmd += (
- " --micro-batch-size 1 --num-layers "
- + str(n)
- + " --hidden-size 128 --num-attention-heads 16"
- )
- cmd += (
- " --max-position-embeddings 128 --seq-length 128 --tensor-model-parallel-size "
- + str(tens_parr)
- )
- cmd += (
- " --pipeline-model-parallel-size "
- + str(pipe_parr)
- + (" --cpu-offload" if offload else "")
- )
- print(cmd)
- runtime, bill_params, success, errs = run_gpt(cmd)
- if success:
- runtimes[dist_setting][bill_params] = runtime
- print(
- str(runtime) + "s per training iter for",
- str(bill_params) + "B parameter GPT-2",
- )
- if n >= 10000:
- plot(runtimes)
- else:
- print("GPT-2 w/", n, "layers failed using", dist_setting)
- print("Moving on to the next distributed setting...")
- print("#" * (25))
- print()
- plot(runtimes)
- break
- print(runtimes)
- plot(runtimes)
- if __name__ == "__main__":
- main()
|