diff --git a/bin/qtl-reheat b/bin/qtl-reheat index 86cd97227..578900c6c 100755 --- a/bin/qtl-reheat +++ b/bin/qtl-reheat @@ -16,6 +16,10 @@ declare -A REHEAT_METHODS=( # default options dataset=train +declare -A modes +for key in submit; do + modes[$key]=false +done # usage sep="================================================================" @@ -37,6 +41,8 @@ $(for key in "${!REHEAT_METHODS[@]}"; do printf "%24s %-11s %-s\n" "" "$key" "${ OPTIONAL OPTIONS: -d [DATASET] unique name for this dataset default: $dataset + --submit submit the slurm jobs, rather than just + printing the \`sbatch\` command """ >&2 } if [ $# -lt 1 ]; then @@ -48,7 +54,7 @@ fi inputDir="" outputDir="" cmd="" -while getopts "i:o:c:d:h" opt; do +while getopts "i:o:c:d:h-:" opt; do case $opt in i) inputDir=$OPTARG ;; o) outputDir=$OPTARG ;; @@ -61,6 +67,12 @@ while getopts "i:o:c:d:h" opt; do usage exit 101 ;; + -) + for key in "${!modes[@]}"; do + [ "$key" == "$OPTARG" ] && modes[$OPTARG]=true && break + done + [ -z "${modes[$OPTARG]-}" ] && printError "unknown option --$OPTARG" && exit 100 + ;; *) exit 100 ;; esac done @@ -159,6 +171,12 @@ EOF echo """ SLURM SCRIPT: $slurmScript JOB LIST: $jobList -Now submitting! """ -sbatch $slurmScript +if ${modes['submit']}; then + echo "Now submitting!" + sbatch $slurmScript +else + echo """Run this command to submit: + sbatch $slurmScript + """ +fi diff --git a/qa-physics/charge_analysis/.gitignore b/qa-physics/charge_analysis/.gitignore new file mode 100644 index 000000000..8e0855fa4 --- /dev/null +++ b/qa-physics/charge_analysis/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +*.png diff --git a/qa-physics/charge_analysis/README.md b/qa-physics/charge_analysis/README.md new file mode 100644 index 000000000..34b0de095 --- /dev/null +++ b/qa-physics/charge_analysis/README.md @@ -0,0 +1,10 @@ +# Charge Analysis + +From Bhawani Singh + +```bash +./analyze HIPO_FILE +``` +Produces PNG files comparing the DAQ-gated FC charge determined from +- directly from the `RUN::scaler` bank +- from the livetime and ungated charge, by multiplication diff --git a/qa-physics/charge_analysis/analyze.py b/qa-physics/charge_analysis/analyze.py new file mode 100755 index 000000000..670d344ce --- /dev/null +++ b/qa-physics/charge_analysis/analyze.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +################################################################################## +# ADAPTED FROM BHAWANI SINGH's ORIGINAL SCRIPT: +# /w/hallb-scshelf2102/clas12/singh/Softwares/QADB_studies/python/main2.py +################################################################################## +import numpy as np +import os +import sys +import logging +from glob import glob +import matplotlib.pyplot as plt +import hipolib + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# plt.style.use('seaborn-darkgrid') + +def main(): + + if len(sys.argv) != 2: + print(f'USAGE: {sys.argv[0]} [HIPO_FILE]') + exit(2) + hipo_file = sys.argv[1] + + hipo_prefix = os.getenv('HIPO') + if hipo_prefix == None: + raise ValueError("HIPO env var not set") + + logger.info(f"Processing file: {hipo_file}") + + reader = hipolib.hreader(f'{hipo_prefix}/lib') + reader.open_with_tag(hipo_file, 1) # filter by tag at open time + reader.define('RUN::config') + reader.define('RUN::scaler') + + timestamps, fcups, fcupgateds, live_times = [], [], [], [] + + counter = 0 + while reader.next(): + if counter % 10000 == 0 and counter > 0: + logger.info(f'Processing event # {counter}') + counter += 1 + + if reader.getSize('RUN::config') == 0 or reader.getSize('RUN::scaler') == 0: + # logger.warning(f"Skipping empty bank at event {counter}") + continue + + timestamp = reader.getEntry('RUN::config', 'timestamp') + fcup = reader.getEntry('RUN::scaler', 'fcup') + fcupgated = reader.getEntry('RUN::scaler', 'fcupgated') + live_time = reader.getEntry('RUN::scaler', 'livetime') + + timestamps.append(timestamp[0]) + fcups.append(fcup[0]) + fcupgateds.append(fcupgated[0]) + live_times.append(live_time[0]) + + logger.info(f"Processed {counter} events.") + + # Sort data by timestamps + sorted_data = sorted(zip(timestamps, fcups, fcupgateds, live_times)) + if not sorted_data: + raise ValueError("No data to plot.") + + timestamps, fcups, fcupgateds, live_times = zip(*sorted_data) + + timestamps = np.array(timestamps) + fcups = np.array(fcups) + fcupgateds = np.array(fcupgateds) + live_times = np.array(live_times) + + run_number = os.path.splitext(os.path.basename(hipo_file))[0] + + # ---------- Plot 1: Per-event data ---------- + # ---------- Plot 1: Per-event data ---------- + fig1, axs1 = plt.subplots(2, 2, figsize=(14, 8)) + fig1.suptitle(f'Run {run_number} - Event-Level Detector Data', fontsize=16) + + plots1 = [ + (axs1[0, 0], fcups, 'FCUP', 'FCUP vs Timestamp', 'darkgreen', 'line'), + (axs1[0, 1], fcupgateds, 'FCUP Gated', 'FCUP Gated vs Timestamp', 'darkorange', 'line'), + (axs1[1, 0], live_times, 'Live Time', 'Live Time vs Timestamp', 'purple', 'scatter'), + (axs1[1, 1], fcups * live_times, 'FCUP × Live Time', 'FCUP × Live Time vs Timestamp', 'steelblue', 'line'), + ] + + for ax, data, label, title, color, style in plots1: + if style == 'line': + ax.plot(timestamps, data, label=label, color=color, linewidth=1.5) + elif style == 'scatter': + ax.scatter(timestamps, data, label=label, color=color, s=10, alpha=0.7) + + ax.set_title(title, fontsize=12) + ax.set_xlabel('Timestamp', fontsize=10) + ax.set_ylabel(label, fontsize=10) + ax.legend(fontsize=9) + ax.grid(True, linestyle='--', alpha=0.6) + ax.tick_params(axis='both', labelsize=9) + + fig1.tight_layout(rect=[0, 0.03, 1, 0.95]) + fig1.savefig(f'fcup_vs_timestamp_{run_number}.png', bbox_inches='tight', dpi=300) + plt.close(fig1) + # ---------- Compute Chunked FCUP Gated with neighbor handling ---------- + chunk_size = 2000 + num_chunks = len(timestamps) // chunk_size + + chunk_caseA, chunk_caseB, chunk_caseC, chunk_default = [], [], [], [] + cum_caseA, cum_caseB, cum_caseC, cum_default = [], [], [], [] + chunk_indices, skipped_counts = [], [] + + runA, runB, runC, runDef = 0, 0, 0, 0 + total_skipped = 0 + + corrected_livetimes_A = [] + corrected_livetimes_B = [] + corrected_livetimes_C = [] + + for i in range(num_chunks): + start = i * chunk_size + end = start + chunk_size + if end >= len(fcups): + break + + # use np.diff for correct increments + fcup_diff = np.diff(fcups[start:end]) + fcupgated_diff = np.diff(fcupgateds[start:end]) + live_sub = live_times[start+1:end] + + sumA, sumB, sumC, sumDef = 0, 0, 0, 0 + skipped_in_chunk = 0 + + for j, lt in enumerate(live_sub): + if lt > 0: + # Case A + sumA += lt * fcup_diff[j] + corrected_livetimes_A.append(lt) + # Case B + sumB += lt * fcup_diff[j] + corrected_livetimes_B.append(lt) + # Case C + sumC += lt * fcup_diff[j] + corrected_livetimes_C.append(lt) + # Default + sumDef += fcupgated_diff[j] + else: + # ----- Case A/B nearest-neighbor substitution ----- + idx_candidates = [] + if j - 1 >= 0 and live_sub[j - 1] > 0: + idx_candidates.append(j - 1) + if j + 1 < len(live_sub) and live_sub[j + 1] > 0: + idx_candidates.append(j + 1) + + if idx_candidates: + nn = min( + idx_candidates, + key=lambda k: abs(timestamps[start + 1 + k] - timestamps[start + 1 + j]) + ) + lt_nn = live_sub[nn] + + # Case A + sumA += lt_nn * fcup_diff[j] + corrected_livetimes_A.append(lt_nn) + + # Case B + sumB += lt_nn * fcupgated_diff[nn] + corrected_livetimes_B.append(lt_nn) + + # Default + sumDef += fcupgated_diff[j] + else: + skipped_in_chunk += 1 + total_skipped += 1 + logger.warning( + f"No valid positive LT neighbor at chunk {i}, local index {j}, " + f"timestamp {timestamps[start+1+j]}" + ) + + # ----- Case C: mean of ±20 neighbors ----- + window = 10 + idx_range = range(max(0, j - window), min(len(live_sub), j + window + 1)) + neigh_lts = [live_sub[k] for k in idx_range if live_sub[k] > 0] + if neigh_lts: + lt_mean = np.mean(neigh_lts) + sumC += lt_mean * fcup_diff[j] + corrected_livetimes_C.append(lt_mean) + + runA += sumA + runB += sumB + runC += sumC + runDef += sumDef + + chunk_caseA.append(sumA) + chunk_caseB.append(sumB) + chunk_caseC.append(sumC) + chunk_default.append(sumDef) + cum_caseA.append(runA) + cum_caseB.append(runB) + cum_caseC.append(runC) + cum_default.append(runDef) + chunk_indices.append(i) + skipped_counts.append(skipped_in_chunk) + + logger.info(f"Computed chunked FCUP Gated values with neighbor handling (Cases A, B, C).") + logger.info(f"Total skipped events (no valid LT neighbor): {total_skipped}") + + # ---------- Plot 2: Chunked FCUP Gated + Ratios + Skips + LT Distribution ---------- + fig2, (ax_top, ax_mid, ax_bottom, ax_ltdist) = plt.subplots( + 4, 1, figsize=(12, 14), sharex=False, + gridspec_kw={'height_ratios': [3, 1, 1, 2]} + ) + fig2.suptitle(f'Run {run_number} - Chunked FCUP Gated (Neighbor Handling)', fontsize=16) + + # Top: cumulative sums + ax_top.plot(chunk_indices, cum_caseA, label='Cumulative Case A (LT_nn × FCUPungated)', color='darkred', marker='o') + #ax_top.plot(chunk_indices, cum_caseB, label='Cumulative Case B (LT_nn × FCUPungated_nn)', color='darkgreen', marker='s') + ax_top.plot(chunk_indices, cum_caseC, label='Cumulative Case C (20-NN mean × FCUPungated)', color='darkorange', marker='d') + ax_top.plot(chunk_indices, cum_default, label='Cumulative Default (FCUPgated)', color='blue', marker='^') + ax_top.set_ylabel('Cumulative Σ', fontsize=11) + ax_top.grid(True, linestyle='--', alpha=0.6) + ax_top.legend(fontsize=10) + ax_top.tick_params(axis='both', labelsize=10) + + # Middle: ratios wrt default + ratioA = np.divide(cum_caseA, cum_default, out=np.full_like(cum_caseA, np.nan, dtype=float), where=np.array(cum_default) != 0) + #ratioB = np.divide(cum_caseB, cum_default, out=np.full_like(cum_caseB, np.nan, dtype=float), where=np.array(cum_default) != 0) + ratioC = np.divide(cum_caseC, cum_default, out=np.full_like(cum_caseC, np.nan, dtype=float), where=np.array(cum_default) != 0) + + ax_mid.plot(chunk_indices, ratioA, label='Case A / Default', color='darkred', marker='o') + #ax_mid.plot(chunk_indices, ratioB, label='Case B / Default', color='darkgreen', marker='s') + ax_mid.plot(chunk_indices, ratioC, label='Case C / Default', color='darkorange', marker='d') + ax_mid.axhline(1.0, color='black', linestyle='--', linewidth=1) + ax_mid.set_ylabel('Ratio', fontsize=11) + ax_mid.grid(True, linestyle='--', alpha=0.6) + ax_mid.legend(fontsize=10) + ax_mid.tick_params(axis='both', labelsize=10) + + # Bottom-1: skipped events count + ax_bottom.bar(chunk_indices, skipped_counts, color='gray', alpha=0.7) + ax_bottom.set_xlabel(f'Chunk Index (Each = {chunk_size} events)', fontsize=11) + ax_bottom.set_ylabel('# Skipped', fontsize=11) + ax_bottom.grid(True, linestyle='--', alpha=0.6) + ax_bottom.tick_params(axis='both', labelsize=10) + + # Bottom-2: livetime distributions + bins = 100 + mean_raw, sigma_raw = np.mean(live_times), np.std(live_times) + mean_A, sigma_A = np.mean(corrected_livetimes_A), np.std(corrected_livetimes_A) + mean_B, sigma_B = np.mean(corrected_livetimes_B), np.std(corrected_livetimes_B) + mean_C, sigma_C = np.mean(corrected_livetimes_C), np.std(corrected_livetimes_C) + + ax_ltdist.hist(live_times, bins=bins, alpha=0.4, + label=f'Raw LT (μ={mean_raw:.3f}, σ={sigma_raw:.3f})', color='purple') + ax_ltdist.hist(corrected_livetimes_A, bins=bins, alpha=0.4, + label=f'Case A LT (μ={mean_A:.3f}, σ={sigma_A:.3f})', color='red') + #ax_ltdist.hist(corrected_livetimes_B, bins=bins, alpha=0.4, + # label=f'Case B LT (μ={mean_B:.3f}, σ={sigma_B:.3f})', color='green') + ax_ltdist.hist(corrected_livetimes_C, bins=bins, alpha=0.4, + label=f'Case C LT (μ={mean_C:.3f}, σ={sigma_C:.3f})', color='orange') + + ax_ltdist.set_xlabel('Live Time', fontsize=11) + ax_ltdist.set_ylabel('Counts', fontsize=11) + ax_ltdist.legend(fontsize=9) + ax_ltdist.grid(True, linestyle='--', alpha=0.6) + ax_ltdist.tick_params(axis='both', labelsize=10) + + fig2.tight_layout(rect=[0, 0.03, 1, 0.95]) + fig2.savefig(f'chunked_fcupgated_comparison_{run_number}.png', bbox_inches='tight', dpi=300) + plt.close(fig2) + + +if __name__ == "__main__": + main() + logger.info("HipoReader example completed.") diff --git a/qa-physics/charge_analysis/hipolib.py b/qa-physics/charge_analysis/hipolib.py new file mode 100644 index 000000000..1407708a2 --- /dev/null +++ b/qa-physics/charge_analysis/hipolib.py @@ -0,0 +1,146 @@ +################################################################## +## Python interface for reading HIPO files. ## +## This interface is using fusion class from the hipo4 module ## +## with "C" extern intraface defined in wrapper.cpp class. ## +## the fusion interface allows opening multiple files through ## +## providing unique handles for each opened file. ## +## ## +## Author: G.Gavalian (2022) ## +## Jefferson Lab -------------------------------- ## +################################################################## + +import ctypes + +class hreader: + + def __init__(self,libfilename): + """ hipo reader python class interface to hipo fusion wrapper + (use it wisely) + """ + self.libPath = libfilename + self.hipo4lib = ctypes.CDLL(self.libPath+'/libhipo4.so') + self.status = ctypes.c_int(0) + self.handle = -1 + + def open(self,filename): + """ open the file and save the handle returned by fusion wrapper. + each opened file has unique handle, so many files can be opened + in parallel. + """ + self.inputFile = filename + self.handle = self.hipo4lib.fusion_open(ctypes.c_char_p(self.inputFile.encode('ascii'))) + print('file open handle = ',self.handle) + + def open_with_tag(self,filename, tag): + """ open the file and save the handle returned by fusion wrapper. + each opened file has unique handle, so many files can be opened + in parallel. + """ + self.inputFile = filename + self.handle = self.hipo4lib.fusion_open_with_tag(ctypes.c_char_p(self.inputFile.encode('ascii')), tag) + print('file open handle = ',self.handle) + + def define(self,bankname): + """ define is used to declare a bank that will be read by the fusion wrapper + each time next() is called. The banks are stored in the internal map for + each opened file handle. + """ + self.hipo4lib.fusion_define(self.handle, ctypes.c_char_p(bankname.encode('ascii'))) + + def describe(self,bankname): + """ define is used to declare a bank that will be read by the fusion wrapper + each time next() is called. The banks are stored in the internal map for + each opened file handle. + """ + self.hipo4lib.fusion_describe(self.handle,ctypes.c_char_p(bankname.encode('ascii'))) + + def getSize(self,bankname): + """ returns size of the bank that was read for current event. + """ + size = self.hipo4lib.fusion_bankSize(self.handle,ctypes.c_char_p(bankname.encode('ascii'))) + return size + + def next(self): + """ reads next event in the file, and reads all the banks that were decalred with + hreader.define(bankname) function call. + """ + status = self.hipo4lib.fusion_next(self.handle) + return status==1 + + def getInt(self,bank,entry,row): + """ returns an integer value for entry and row from requested bank. call getSize() + first to make sure that the row is within the allowable range to avoid hard crashes. + """ + a1 = ctypes.c_char_p(bank.encode('ascii')) + a2 = ctypes.c_char_p(entry.encode('ascii')) + self.hipo4lib.fusion_get_float.restype = ctypes.c_int + value = self.hipo4lib.fusion_get_int(self.handle,a1,a2,row) + return value + + def getLong(self,bank,entry,row): + """ returns an integer value for entry and row from requested bank. call getSize() + first to make sure that the row is within the allowable range to avoid hard crashes. + """ + a1 = ctypes.c_char_p(bank.encode('ascii')) + a2 = ctypes.c_char_p(entry.encode('ascii')) + self.hipo4lib.fusion_get_long.restype = ctypes.c_ulonglong + value = self.hipo4lib.fusion_get_long(self.handle,a1,a2,row) + return value + + def getFloat(self,bank,entry,row): + """ returns an float value for entry and row from requested bank. call getSize() + """ + a1 = ctypes.c_char_p(bank.encode('ascii')) + a2 = ctypes.c_char_p(entry.encode('ascii')) + self.hipo4lib.fusion_get_float.restype = ctypes.c_float + value = ctypes.c_float(self.hipo4lib.fusion_get_float(self.handle,a1,a2,row)).value + return value + + def getDouble(self,bank,entry,row): + """ returns an float value for entry and row from requested bank. call getSize() + """ + a1 = ctypes.c_char_p(bank.encode('ascii')) + a2 = ctypes.c_char_p(entry.encode('ascii')) + self.hipo4lib.fusion_get_float.restype = ctypes.c_double + value = ctypes.c_double(self.hipo4lib.fusion_get_float(self.handle,a1,a2,row)).value + return value + + def getType(self,bank,entry): + """ returns type of the entry for the given bank. this is used to determine weather to + use getInt or getFloat function. + """ + a1 = ctypes.c_char_p(bank.encode('ascii')) + a2 = ctypes.c_char_p(entry.encode('ascii')) + type = self.hipo4lib.fusion_entry_type(self.handle,a1,a2) + return type + + def getEntry(self,bank,entry): + """ returns a python array containing all rows for given column. the determination of the + is done internaly. + """ + rows = self.getSize(bank) + type = self.getType(bank,entry) + array = [] + if(type==1 or type==2 or type==3): + for row in range(rows): + array.append(self.getInt(bank,entry,row)) + if(type==4): + for row in range(rows): + array.append(self.getFloat(bank,entry,row)) + if(type==5): + for row in range(rows): + array.append(self.getDouble(bank,entry,row)) + if(type==8): + for row in range(rows): + array.append(self.getLong(bank,entry,row)) + + return array + + def get_entries(self): + """Returns the total number of entries (events) in the currently opened file.""" + self.hipo4lib.fusion_get_entries.restype = ctypes.c_int + return self.hipo4lib.fusion_get_entries(self.handle) + + def show(self, bank: str): + self.hipo4lib.fusion_show.restype = ctypes.c_voidp + self.hipo4lib.fusion_show(self.handle, ctypes.c_char_p(bank.encode('ascii'))) \ No newline at end of file diff --git a/qadb/notes/rga_sp18.md b/qadb/notes/rga_sp18.md index bad438071..aa72bcbcb 100644 --- a/qadb/notes/rga_sp18.md +++ b/qadb/notes/rga_sp18.md @@ -33,6 +33,8 @@ qtl reheat -c rollover -d rga_sp18_outbending_nSidis -o /volatile/clas12/users/$ qtl reheat -c rollover -d rga_sp18_inbending_nSidis -o /volatile/clas12/users/$LOGNAME/reheat/rga_sp18_inbending_nSidis -i /cache/clas12/rg-a/production/recon/spring2018/10.59gev/torus-1/pass1/dst/train/nSidis ``` +3. check the results on some runs; see [`qa-physics/charge_analysis/README.md`](/qa-physics/charge_analysis/README.md) + ## Run monitoring > [!IMPORTANT] diff --git a/util/run-here.sh b/util/run-here.sh new file mode 100755 index 000000000..2e63f2e3c --- /dev/null +++ b/util/run-here.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash +set -euo pipefail +offset=0 +dry=false +NUM_THREADS=4 # number of parallel jobs +usage() { + echo """ + Run jobs from a slurm submission script's job list on + the current interactive node; be nice and keep NUM_JOBS + low, since this is only meant for rapid testing + + USAGE: $0 [JOB_LIST] [LOG_DIR] [NUM_JOBS] [OFFSET] [DRY] + JOB_LIST file with job scripts, one per line + LOG_DIR directory for log files (clobbered) + NUM_JOBS the number of parallel jobs to run + OFFSET offset of the first job; NUM_JOBS + consecutive lines will be used + default: $offset + DRY set to '1' for a dry-run (no submission) + default: run immediately + """ +} + +if [ $# -lt 3 ]; then + usage + exit 2 +fi +job_list=$1 +log_dir=$2 +num_jobs=$3 +[ $# -ge 4 ] && offset=$4 +[ $# -ge 5 ] && dry=true + +if [ ! -f "$job_list" ]; then + echo "ERROR: File '$job_list' not found" + exit 1 +fi + +mkdir -p $log_dir + +job_ids=() + +function cleanup_jobs() { + echo "" + echo ">>> Caught signal, killing all jobs..." + for job_id in "${job_ids[@]}"; do + if ps -p "$job_id" >& /dev/null; then + # Kill the entire process group (negative PID) + kill -- -"$job_id" 2>/dev/null || true # SIGTERM + fi + done + sleep 1 + # SIGKILL, if still alive + for job_id in "${job_ids[@]}"; do + if ps -p "$job_id" >& /dev/null; then + kill -9 -- -"$job_id" 2>/dev/null || true + fi + done + echo """>>> All jobs killed. + To check if any remain: + ps -ef | grep $(whoami) + Kill zombies with, e.g.,: + pkill -u $(whoami) java + """ + exit 1 +} +trap cleanup_jobs SIGINT SIGTERM + +function wait_for_jobs() { + stat=10 + while [ "${#job_ids[@]}" -gt $1 ]; do + for i in "${!job_ids[@]}"; do + if [ "$1" -eq 0 ]; then + if [ "${#job_ids[@]}" -lt $stat ]; then + echo ">>> $(date) >>> waiting on ${#job_ids[@]} jobs" + stat=${#job_ids[@]} + fi + fi + set +e + ps ${job_ids[$i]} >& /dev/null + if [ "$?" -ne 0 ]; then + echo ">>> jobid ${job_ids[$i]} finished." + unset job_ids[$i] + fi + set -e + done + sleep 1 + done +} + +j=0 +echo "===== JOBS: =====" +while IFS= read -r cmd; do + j=$((j+1)) + echo "JOB $j: $cmd" + if ! $dry; then + setsid $cmd > $log_dir/job.$j.out 2> $log_dir/job.$j.err & + job_ids+=($!) + wait_for_jobs $((NUM_THREADS-1)) + fi +done < <(tail -n +$((offset + 1)) $job_list | head -n $num_jobs) +wait_for_jobs 0 + +echo "=================" +if $dry; then + echo "THIS WAS A DRY-RUN; no jobs submitted" +else + echo "DONE!" +fi