Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions tf/ansible/files/scripts/cluster_util_condor.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
# Details: This script is used to monitor the entire HTCondor cluster usage independent of GalaxyGroup's

# Total number of detected CPUs at the machine level
total_detected_cpus=$(condor_status -af Cpus | paste -s -d'+' | bc)

# Claimed CPUs
claimed_cpus=$(condor_status -af Cpus -constraint 'State == "Claimed"' | paste -s -d'+' | bc)

# Unclaimed CPUs
unclaimed_cpus=$(condor_status -af Cpus -constraint 'State == "Unclaimed"' | paste -s -d'+' | bc)

# Total memory at the machine level
total_memory=$(condor_status -af Memory | paste -s -d'+' | bc)

# Claimed memory
claimed_memory=$(condor_status -af Memory -constraint 'State == "Claimed"' | paste -s -d'+' | bc)

# Unclaimed memory
unclaimed_memory=$(condor_status -af Memory -constraint 'State == "Unclaimed"' | paste -s -d'+' | bc)

# Total number of GPU slots
total_gpu_slots=$(condor_status -af Name -constraint 'CUDADeviceName =!= undefined' | wc -l)

# Claimed GPUs slots
claimed_gpus=$(condor_status -af Name -constraint 'State == "Claimed" && CUDADeviceName =!= undefined' | wc -l)

# Unclaimed GPUs slots
unclaimed_gpus=$(condor_status -af Name -constraint 'State == "Unclaimed" && CUDADeviceName =!= undefined' | wc -l)

# Total load average at the machine level
total_loadavg=$(condor_status -af LoadAvg | paste -s -d'+' | bc)

# Claimed load average
claimed_loadavg=$(condor_status -af LoadAvg -constraint 'State == "Claimed"' | paste -s -d'+' | bc)

# Unclaimed load average
unclaimed_loadavg=$(condor_status -af LoadAvg -constraint 'State == "Unclaimed"' | paste -s -d'+' | bc)

# Total number of slots
total_slots=$(condor_status -af Name | wc -l)

# Total number of Claimed slots with Activity Busy
claimed_busy_slots=$(condor_status -af Name -constraint 'State == "Claimed" && Activity == "Busy"' | wc -l)

# Total number of Unclaimed slots with Activity Idle
unclaimed_idle_slots=$(condor_status -af Name -constraint 'State == "Unclaimed" && Activity == "Idle"' | wc -l)

# Output in influxdb protocol format
echo "htcondor_cluster_usage,classad='machine' total_detected_cpus=$total_detected_cpus,claimed_cpus=$claimed_cpus,unclaimed_cpus=$unclaimed_cpus,total_memory=$total_memory,claimed_memory=$claimed_memory,unclaimed_memory=$unclaimed_memory,total_loadavg=$total_loadavg,claimed_loadavg=$claimed_loadavg,unclaimed_loadavg=$unclaimed_loadavg,total_slots=$total_slots,claimed_busy_slots=$claimed_busy_slots,unclaimed_idle_slots=$unclaimed_idle_slots,total_gpu_slots=$total_gpu_slots,claimed_gpus=$claimed_gpus,unclaimed_gpus=$unclaimed_gpus"

122 changes: 122 additions & 0 deletions tf/ansible/files/scripts/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""This script is a producer that publishes condor status details to the queue"""

import argparse
import os
import re
import subprocess
import sys
import time
import yaml
from kombu import Connection, Exchange, Producer, Queue


def get_amqp_url(pulsar_app_file: str) -> None:
"""
Parse the Pulsar app.yml file and extract the AMQP URL.
"""
# Check the existence of the job_conf.yml file
if not os.path.exists(pulsar_app_file):
print(f"File {pulsar_app_file} does not exist.")
return None

with open(pulsar_app_file, "r") as file:
app_conf = yaml.safe_load(file)
amqp_url = app_conf['message_queue_url']

return amqp_url


def get_vhost_name(amqp_url: str) -> str:
"""
Parse the AMQP URL to extract the vhost name.
"""
vhost = amqp_url.split("/")[-1].split("?")[0]
return vhost


def connect_to_queue(amqp_url: str) -> Connection:
"""
Connect to the AMQP queue using the provided URL.
"""
# With try and except block, connect to the AMQP queue using the provided URL and manage the error if the connection fails
try:
connection = Connection(amqp_url)
connection.ensure_connection(max_retries=3)
return connection
except Exception as e:
print(f"Error connecting to the AMQP queue: {e}")
return None


def process_condor_status_output(condor_status_output: str) -> str:
"""
Replace empty/None values with 0 in the condor status output
"""
processed_output = re.sub(r'(\w+)=,', r'\1=0,', condor_status_output)
return processed_output


def get_condor_status(cluster_status_script_file: str) -> str:
"""
Get condor status from shell script output
"""
condor_metrics = process_condor_status_output(subprocess.check_output(["sh", cluster_status_script_file]).decode("utf-8").strip())

# Add timestamp
now = time.time()
condor_metrics = f"{condor_metrics},querytime={now}"
return condor_metrics


def produce_message(amqp_url: str, condor_metrics: str) -> None:
"""
Produce and publish messages to the queue.
"""
connection = connect_to_queue(amqp_url)

if connection:
vhost = get_vhost_name(amqp_url)
routing_key = f"{vhost}-condor"
channel = connection.channel()
exchange = Exchange(f"{vhost}-condor-exchange", type="direct")
producer = Producer(exchange=exchange, channel=channel, routing_key=routing_key)
queue = Queue(name=f"{vhost}-condor-stats", exchange=exchange, routing_key=routing_key)
queue.maybe_bind(connection)
queue.declare()

# Add destination to metrics
condor_metrics = f"{condor_metrics},destinationd_id={vhost}"

producer.publish(
{"condor_metrics": condor_metrics},
exchange=exchange,
routing_key=routing_key,
declare=[queue],
serializer="json"
)
connection.release()


def main(pulsar_app_file: str, cluster_status_script_file: str) -> None:

amqp_url = get_amqp_url(pulsar_app_file)

if not amqp_url:
print("No Pulsar url found in the pulsar configuration file.")
sys.exit(1)

# Get the condor status
condor_metrics = get_condor_status(cluster_status_script_file)

# Create and publish message
produce_message(amqp_url, condor_metrics)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Produce messages to AMQP queues.")
parser.add_argument("pulsar_app_file", type=str, help="Path to the Pulsar app configuration file (YAML).")
parser.add_argument("cluster_status_script_file", type=str, help="Path to the shell script that produces influx compatible condor status metrics.")
args = parser.parse_args()

main(args.pulsar_app_file, args.cluster_status_script_file)

3 changes: 3 additions & 0 deletions tf/ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ name_prefix: "vgcn-mira-"
name_suffix: ".pulsar"
private_network_prefix: "{{ condor_ip_range }}"
workers_group_name: "workers"

# cluster utilization stats collection
collect_status: false
5 changes: 5 additions & 0 deletions tf/ansible/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,8 @@
notify: restart pulsar
become: yes

- name: import cluster status tasks
import_tasks: status.yml
when: collect_status
become: yes

16 changes: 16 additions & 0 deletions tf/ansible/status.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
- name: Change permissions of condor status script files
file:
path: "{{ pulsar_data_path }}/{{ item.name }}"
mode: "{{ item.mode }}"
owner: pulsar
group: pulsar
with_items:
- {'name': scripts/producer.py, 'mode': '0700'}
- {'name': scripts/cluster_util_condor.sh, 'mode': '0700'}

- name: Add cron to collect stats
cron:
name: "condor stats"
minute: "*"
job: "/opt/pulsar/venv3/bin/python {{ pulsar_data_path }}/scripts/producer.py {{ pulsar_root }}/config/app.yml {{ pulsar_data_path }}/scripts/cluster_util_condor.sh"