-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathai_guard.py
More file actions
executable file
·188 lines (157 loc) · 6.21 KB
/
Copy pathai_guard.py
File metadata and controls
executable file
·188 lines (157 loc) · 6.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#!/usr/bin/env python
"""
AI Guard: Secure Dynamic Write Access Controller (v5 - Final)
============================================================
WHAT IS THIS?
A security-focused utility for Linux to manage directory permissions for AI
coding tools running in containers. It allows you to toggle specific
directories between Read-Only (RO) and Read-Write (RW) without
restarting the container or modifying the AI tool.
HOW IT WORKS:
1. Start your container with the project root mounted as 'ro,rshared'.
2. AI Guard uses 'bind propagation' to create "writeable islands" on top
of the read-only foundation.
3. Because of 'rshared', the container sees the permission change instantly.
PREREQUISITES:
- Linux Host.
- Project mounted in Docker/Podman with: -v /path:/workspace:ro,rshared
- Sudoers entry for passwordless use (optional but recommended).
USAGE:
Unlock a dir: sudo python3 ai_guard.py --allow ./src
Lock a dir: sudo python3 ai_guard.py --deny ./src
Force Lock: sudo python3 ai_guard.py --deny ./src --force
Reset all: sudo python3 ai_guard.py --reset
POTENTIAL PROBLEMS & SOLUTIONS:
- Problem: "Target is Busy" error when locking.
Cause: The AI tool or a terminal is currently "inside" that folder.
Solution: Use the --force flag to kill processes using that directory.
- Problem: Changes aren't appearing in the container.
Cause: Container was started without the 'rshared' propagation flag.
Solution: Restart container with ':ro,rshared' in the volume string.
"""
import subprocess
import argparse
import os
import sys
import re
from datetime import datetime
LOG_FILE = "/var/log/ai_guard.log"
MOUNT_BIN = "/usr/bin/mount"
UMOUNT_BIN = "/usr/bin/umount"
FUSER_BIN = "/usr/bin/fuser"
def decode_kernel_path(path_str):
"""Accurately decodes kernel octal sequences like \\040."""
try:
return path_str.encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
except:
return path_str
def log_event(message):
clean_msg = message.replace('\n', ' ').replace('\r', ' ')
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
if not os.path.exists(LOG_FILE):
os.close(os.open(LOG_FILE, os.O_CREAT | os.O_WRONLY, 0o600))
with open(LOG_FILE, "a") as f:
f.write(f"[{timestamp}] {clean_msg}\n")
except: pass
def get_mount_info():
mounts = []
try:
with open("/proc/self/mountinfo", "r") as f:
for line in f:
parts = line.split()
if len(parts) > 4:
mounts.append((decode_kernel_path(parts[4]), "rw" in line))
except Exception as e:
print(f"Kernel Error: {e}")
return mounts
def check_status(target_path):
real_target = os.path.realpath(target_path)
mounts = get_mount_info()
for m_path, is_rw in mounts:
if m_path == real_target:
return True, is_rw, "direct"
for m_path, is_rw in mounts:
if is_rw and real_target.startswith(m_path + os.sep):
return True, True, f"inherited from {m_path}"
return False, False, "none"
def run_secure_cmd(args):
try:
result = subprocess.run(args, capture_output=True, text=True, check=True)
return True, ""
except subprocess.CalledProcessError as e:
return False, e.stderr.strip() or e.stdout.strip()
def allow_write(path):
real_path = os.path.realpath(path)
if not os.path.isdir(real_path):
print(f"Error: {real_path} is not a directory.")
return
is_mounted, is_rw, reason = check_status(real_path)
if is_rw:
print(f"Verified: Already writable ({reason}).")
return
if not (is_mounted and reason == "direct"):
ok, err = run_secure_cmd([MOUNT_BIN, "--bind", "--", real_path, real_path])
if not ok:
print(f"Mount failed: {err}")
return
ok, err = run_secure_cmd([MOUNT_BIN, "-o", "remount,rw", "--", real_path])
if ok:
print(f"🔓 Unlocked: {real_path}")
log_event(f"ALLOWED: {real_path}")
else:
print(f"Remount Error: {err}")
if reason != "direct": run_secure_cmd([UMOUNT_BIN, "--", real_path])
def deny_write(path, force=False):
real_path = os.path.realpath(path)
is_mounted, _, reason = check_status(real_path)
if reason != "direct":
print(f"Error: {real_path} is not an active AI Guard mount.")
return
if force:
print(f"Force-clearing processes using {real_path}...")
run_secure_cmd([FUSER_BIN, "-k", "-m", real_path])
ok, err = run_secure_cmd([UMOUNT_BIN, "--", real_path])
if ok:
print(f"🔒 Locked: {real_path}")
log_event(f"REVOKED: {real_path}")
else:
print(f"CRITICAL: Failed to lock {real_path}.\nReason: {err}")
if "target is busy" in err.lower():
print("Tip: Use --force to kill processes holding this directory open.")
def reset_all():
paths = set()
if os.path.exists(LOG_FILE):
with open(LOG_FILE, "r") as f:
for line in f:
m = re.search(r"ALLOWED: (.*)", line)
if m: paths.add(m.group(1).strip())
for p in paths:
m, _, r = check_status(p)
if r == "direct":
deny_write(p, force=False)
open(LOG_FILE, 'w').close()
print("✨ Reset complete. System back to default Read-Only.")
def main():
parser = argparse.ArgumentParser(description="AI Guard v5")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--allow', '-a', metavar='DIR')
group.add_argument('--deny', '-d', metavar='DIR')
group.add_argument('--status', '-s', metavar='DIR')
group.add_argument('--reset', action='store_true')
parser.add_argument('--force', '-f', action='store_true', help='Force unmount by killing active processes')
args = parser.parse_args()
if os.geteuid() != 0:
print("Fatal: Run with sudo.")
sys.exit(1)
if args.reset:
reset_all()
elif args.status:
m, rw, r = check_status(args.status)
print(f"State: {'RW' if rw else 'RO'} ({r})")
elif args.allow:
allow_write(args.allow)
elif args.deny:
deny_write(args.deny, force=args.force)
if __name__ == "__main__":
main()