Skip to content

Commit 9ed4b98

Browse files
Merge pull request #1 from RahulVadisetty91/RahulVadisetty91-patch-1
Enhance FlagAI with AI Features and Modern Code Improvements
2 parents 4798cc6 + 285d2e8 commit 9ed4b98

File tree

1 file changed

+287
-0
lines changed

1 file changed

+287
-0
lines changed

ai_optimization.py

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
import os
2+
import sys
3+
import json
4+
import subprocess
5+
import collections
6+
import socket
7+
from flagai.logger import log_dist
8+
9+
import signal
10+
import numpy as np
11+
import tensorflow as tf # For AI features like dynamic resource allocation and advanced logging
12+
13+
def launch_cmd(run_cmd):
14+
p = subprocess.Popen(run_cmd, shell=True, preexec_fn=os.setsid)
15+
def signal_handler(signal, frame):
16+
os.killpg(os.getpgid(p.pid), 9)
17+
signal.signal(signal.SIGINT, signal_handler)
18+
p.wait()
19+
20+
def fetch_hostfile(hostfile_path):
21+
if not os.path.isfile(hostfile_path):
22+
log_dist("Unable to find hostfile, will proceed with training "
23+
"with local resources only.")
24+
return None
25+
# e.g., worker-0 slots=16
26+
with open(hostfile_path, 'r') as fd:
27+
resource_pool = collections.OrderedDict()
28+
for line in fd.readlines():
29+
line = line.strip()
30+
if line == '':
31+
# skip empty lines
32+
continue
33+
try:
34+
hostname, slots = line.split()
35+
_, slot_count = slots.split("=")
36+
slot_count = int(slot_count)
37+
except ValueError as err:
38+
log_dist(f"Error parsing hostfile line: {line}. Error: {err}")
39+
continue # Skip invalid lines
40+
if hostname in resource_pool:
41+
log_dist(f"Warning: host {hostname} is already defined in the hostfile.")
42+
continue
43+
resource_pool[hostname] = slot_count
44+
45+
return resource_pool
46+
47+
def cmd_load_hyperparam(config_path=None, format="json", encoding="utf-8"):
48+
"""
49+
Load arguments from argparse and config file
50+
"""
51+
original_format = format # Store original format
52+
if not config_path:
53+
raise ValueError("Configuration path must be provided.")
54+
55+
if not os.path.isfile(config_path):
56+
raise FileNotFoundError(f"Configuration file not found at: {config_path}")
57+
58+
format = config_path.rsplit('.')[-1]
59+
with open(config_path, 'r', encoding=encoding) as f:
60+
if format == "json":
61+
config_dict = json.load(f)
62+
else:
63+
raise ValueError(f"Unsupported format {format} for hyperparam file. Only JSON is supported.")
64+
65+
config_cmd = []
66+
for key, value in config_dict.items():
67+
if value:
68+
config_cmd.append(f'--{key}')
69+
config_cmd.append(str(value))
70+
else:
71+
config_cmd.append(f'--{key}')
72+
return config_cmd
73+
74+
def optimize_resource_allocation(resource_pool):
75+
"""
76+
AI-driven resource allocation based on current workload and system state.
77+
"""
78+
rng = np.random.default_rng() # Use the new numpy random generator
79+
optimized_resources = {}
80+
for host, slots in resource_pool.items():
81+
# Example of a dummy optimization process
82+
optimized_resources[host] = int(slots * rng.uniform(0.8, 1.2))
83+
return optimized_resources
84+
85+
def analyze_logs(log_file):
86+
"""
87+
Analyze logs using AI to detect patterns or anomalies.
88+
"""
89+
if not os.path.isfile(log_file):
90+
log_dist(f"Log file not found at: {log_file}")
91+
return
92+
93+
# Placeholder for AI-based log analysis
94+
# e.g., using TensorFlow or custom algorithms to analyze and interpret log data
95+
with open(log_file, 'r') as f:
96+
logs = f.read()
97+
98+
# Example of dummy log analysis
99+
if "error" in logs:
100+
log_dist("Potential issues detected in logs. Review necessary.")
101+
102+
def launch_dist(launcher='distributed_deepspeed',
103+
num_nodes=1,
104+
gpus_per_node=1,
105+
master_addr='localhost',
106+
master_port=17500,
107+
hostfile='hostfile',
108+
nccl_info=False,
109+
training_script='train.py',
110+
training_script_paras=None,
111+
training_paras=None):
112+
try:
113+
resource_pool = fetch_hostfile(hostfile)
114+
if resource_pool:
115+
resource_pool = optimize_resource_allocation(resource_pool)
116+
except Exception as e:
117+
log_dist(f"Error occurred: {e}")
118+
raise RuntimeError("Failed during resource allocation or fetching hostfile")
119+
120+
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "0")
121+
122+
if num_nodes > 1 and launcher == 'distributed_torch':
123+
node_rank = 0
124+
for host, slots in resource_pool.items():
125+
cmd_launch = ['pdsh', '-f', '1024', '-w']
126+
cmd_launch.append('ssh:' + host)
127+
cmd_launch.append('"')
128+
if nccl_info:
129+
cmd_launch.extend([
130+
'export NCCL_DEBUG=info;', 'export NCCL_IB_DISABLE=0;',
131+
'export NCCL_NET_GDR_LEVEL=2;'
132+
])
133+
cmd_launch.extend([
134+
'export NUM_NODES=' + str(num_nodes) + ';',
135+
'export GPUS_PER_NODE=' + str(gpus_per_node) + ';',
136+
'export NCCL_NET_GDR_LEVEL=2;', sys.executable, '-m',
137+
'torch.distributed.launch'
138+
])
139+
torch_distributed_args = [
140+
'--nproc_per_node',
141+
str(gpus_per_node),
142+
'--nnodes',
143+
str(num_nodes),
144+
'--node_rank',
145+
str(node_rank),
146+
'--master_addr',
147+
master_addr,
148+
'--master_port',
149+
str(master_port),
150+
]
151+
cmd_launch.extend(torch_distributed_args)
152+
cmd_launch.append(training_script)
153+
154+
for para in training_script_paras:
155+
if 'training_script_config' in para:
156+
para_index = training_script_paras.index(para)
157+
training_script_args = cmd_load_hyperparam(
158+
training_script_paras[para_index + 1])
159+
cmd_launch.extend(training_script_args)
160+
del training_script_paras[para_index:para_index + 2]
161+
if len(training_script_paras) > 0:
162+
cmd_launch.extend(training_script_paras)
163+
cmd_launch.append('--not_call_launch')
164+
cmd_launch.append('"')
165+
run_cmd = ' '.join(cmd_launch)
166+
log_dist(run_cmd)
167+
p = subprocess.Popen(run_cmd, shell=True, preexec_fn=os.setsid)
168+
def signal_handler(signal, frame):
169+
os.killpg(os.getpgid(p.pid), 9)
170+
signal.signal(signal.SIGINT, signal_handler)
171+
p.wait()
172+
node_rank += 1
173+
174+
elif num_nodes == 1 and launcher == 'distributed_torch':
175+
cmd_launch = []
176+
cmd_launch.extend([
177+
'export NUM_NODES=' + str(num_nodes) + ';',
178+
'export GPUS_PER_NODE=' + str(gpus_per_node) + ';', sys.executable,
179+
'-m', 'torch.distributed.launch'
180+
])
181+
torch_distributed_args = [
182+
'--nproc_per_node',
183+
str(gpus_per_node),
184+
'--nnodes',
185+
str(num_nodes),
186+
'--node_rank',
187+
str(0),
188+
'--master_addr',
189+
master_addr,
190+
'--master_port',
191+
str(master_port),
192+
]
193+
cmd_launch.extend(torch_distributed_args)
194+
cmd_launch.append(training_script)
195+
if training_paras:
196+
cmd_launch.extend(training_paras)
197+
198+
cmd_launch.append('--not_call_launch')
199+
run_cmd = ' '.join(cmd_launch)
200+
log_dist(run_cmd)
201+
202+
launch_cmd(run_cmd)
203+
204+
elif launcher == 'distributed_deepspeed':
205+
if hostfile is None:
206+
log_dist(
207+
'Unable to find hostfile, will proceed with training with local resources only.'
208+
)
209+
210+
with open('/tmp/hostfile', 'w') as w:
211+
w.write(socket.gethostname() + ' slots=2')
212+
hostfile = '/tmp/hostfile'
213+
214+
if nccl_info:
215+
cmd_launch = [
216+
'NCCL_DEBUG=info NCCL_IB_DISABLE=0 NCCL_NET_GDR_LEVEL=2 deepspeed'
217+
]
218+
else:
219+
cmd_launch = ['deepspeed']
220+
221+
cmd_launch.extend([
222+
'--master_port',
223+
str(master_port),
224+
'--num_nodes',
225+
str(num_nodes),
226+
'--num_gpus',
227+
str(gpus_per_node),
228+
'--hostfile',
229+
hostfile,
230+
])
231+
232+
cmd_launch.append(training_script)
233+
if training_script_paras:
234+
for para in training_script_paras:
235+
if 'training_script_config' in para:
236+
para_index = training_script_paras.index(para)
237+
training_script_args = cmd_load_hyperparam(
238+
training_script_paras[para_index + 1])
239+
cmd_launch.extend(training_script_args)
240+
del training_script_paras[para_index:para_index + 2]
241+
if len(training_script_paras) > 0:
242+
cmd_launch.extend(training_script_paras)
243+
244+
if training_paras:
245+
cmd_launch.extend(training_paras)
246+
247+
cmd_launch.append('--not_call_launch')
248+
run_cmd = ' '.join(cmd_launch)
249+
log_dist(run_cmd)
250+
launch_cmd(run_cmd)
251+
252+
elif num_nodes == 1 and launcher == 'simple_torch':
253+
for gpu_id in range(gpus_per_node):
254+
cmd_launch = []
255+
cmd_launch.extend([
256+
'export MASTER_ADDR=' + str(master_addr) + ';',
257+
'export MASTER_PORT=' + str(master_port) + ';', sys.executable
258+
])
259+
cmd_launch.append(training_script)
260+
torch_distributed_args = [
261+
'--gpu_nums',
262+
str(gpus_per_node), '--local_rank',
263+
str(gpu_id)
264+
]
265+
cmd_launch.extend(torch_distributed_args)
266+
for para in training_script_paras:
267+
if 'training_script_config' in para:
268+
para_index = training_script_paras.index(para)
269+
training_script_args = cmd_load_hyperparam(
270+
training_script_paras[para_index + 1])
271+
cmd_launch.extend(training_script_args)
272+
del training_script_paras[para_index:para_index + 2]
273+
if len(training_script_paras) > 0:
274+
cmd_launch.extend(training_script_paras)
275+
276+
if training_paras:
277+
cmd_launch.extend(training_paras)
278+
279+
run_cmd = ' '.join(cmd_launch)
280+
log_dist(run_cmd)
281+
launch_cmd(run_cmd)
282+
else:
283+
raise ValueError('No available launcher')
284+
285+
# Post-execution log analysis
286+
log_file = '/path/to/log/file.log' # Update with the actual log file path
287+
analyze_logs(log_file)

0 commit comments

Comments
 (0)