import subprocess import os from apex.transformer.testing.commons import TEST_SUCCESS_MESSAGE def run_gpt(cmd): args = list(cmd.split(" ")) p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outs, errs = p.communicate() outs = list(str((outs).decode("utf-8")).splitlines()) success = False runtime = 0 num_params = 0 for out in outs: out = str(out) if "Average Iteration Time:" in str(out): slicey = out[out.find(":") + 2 :] try: runtime = float(slicey) except: print(slicey) quit() if "Number of Parameters:" in str(out): slicey = out[out.find(":") + 2 :] try: num_params = int(slicey) except: print(slicey) quit() if str(out) == str(TEST_SUCCESS_MESSAGE): success = True return runtime, round(float(int(num_params)) / 10.0 ** 9, 3), success, errs def plot(runtimes): import matplotlib.pyplot as plt for distributed_setting in runtimes.keys(): plt.scatter( runtimes[distributed_setting].keys(), runtimes[distributed_setting].values(), label=distributed_setting, ) plt.legend() plt.xlabel("Parameters (Billions)") plt.ylabel("Training Iteration time (s)") plt.title(str("GPT Scaling w/ Offloading")) plt.savefig("offload_gpt_scaling.png") plt.close() if not os.path.exists("/my_workspace/"): os.system("mkdir /my_workspace/") os.system("cp *.png /my_workspace/") def main(): runtimes = {} nlist = ( list(range(2000, 10000, 2000)) + list(range(10000, 50000, 5000)) + list(range(50000, 100000, 10000)) ) print("N-List:", nlist) for data_parr, tens_parr, pipe_parr in [(8, 1, 1), (4, 2, 1), (2, 1, 4), (1, 2, 4)]: for offload in [True, False]: dist_setting = ( "ddp=" + str(data_parr) + ", tensor_parr=" + str(tens_parr) + ", pipe_parr=" + str(pipe_parr) + ", offload=" + str(offload) ) runtimes[dist_setting] = {} print("Beginning Testing for", dist_setting) for n in nlist: cmd = "python3 -m torch.distributed.launch --nproc_per_node=8 run_gpt_minimal_test.py" cmd += ( " --micro-batch-size 1 --num-layers " + str(n) + " --hidden-size 128 --num-attention-heads 16" ) cmd += ( " --max-position-embeddings 128 --seq-length 128 --tensor-model-parallel-size " + str(tens_parr) ) cmd += ( " --pipeline-model-parallel-size " + str(pipe_parr) + (" --cpu-offload" if offload else "") ) print(cmd) runtime, bill_params, success, errs = run_gpt(cmd) if success: runtimes[dist_setting][bill_params] = runtime print( str(runtime) + "s per training iter for", str(bill_params) + "B parameter GPT-2", ) if n >= 10000: plot(runtimes) else: print("GPT-2 w/", n, "layers failed using", dist_setting) print("Moving on to the next distributed setting...") print("#" * (25)) print() plot(runtimes) break print(runtimes) plot(runtimes) if __name__ == "__main__": main()