11import pytest
22import json
3+ import shlex
4+ import sqlite3
5+ import tempfile
6+ from pathlib import Path
7+
38from ark .webapp .crypto import encrypt_text , decrypt_text
49
510class TestCredentialSecurity :
@@ -15,30 +20,30 @@ def test_users(self):
1520 def test_encryption_isolation (self , test_users ):
1621 """Verify that identical plaintexts result in different ciphertexts for different users."""
1722 test_secret = "sk-ant-1234567890abcdef"
18-
23+
1924 user_a = test_users ["user_a" ]
2025 user_b = test_users ["user_b" ]
21-
26+
2227 enc_a = encrypt_text (test_secret , user_a )
2328 enc_b = encrypt_text (test_secret , user_b )
24-
29+
2530 assert enc_a != enc_b , "Ciphertexts must be different for different users (salting check)"
2631 assert len (enc_a ) > 0
2732 assert len (enc_b ) > 0
2833
2934 def test_decryption_isolation (self , test_users ):
3035 """Verify that a user cannot decrypt another user's ciphertext."""
3136 test_secret = "sk-ant-isolation-test"
32-
37+
3338 user_a = test_users ["user_a" ]
3439 user_b = test_users ["user_b" ]
35-
40+
3641 enc_a = encrypt_text (test_secret , user_a )
37-
42+
3843 # Successful decryption for owner
3944 dec_a = decrypt_text (enc_a , user_a )
4045 assert dec_a == test_secret
41-
46+
4247 # Failed decryption for other user
4348 dec_b = decrypt_text (enc_a , user_b )
4449 assert dec_b == "" , "User B should not be able to decrypt User A's data"
@@ -54,3 +59,97 @@ def test_invalid_ciphertext(self, test_users):
5459 """Verify that invalid ciphertexts return empty strings rather than crashing."""
5560 user_a = test_users ["user_a" ]
5661 assert decrypt_text ("not-a-valid-fernet-token" , user_a ) == ""
62+
63+
64+ class TestSlurmInjection :
65+ """Tests that shell metacharacters in API keys are safely escaped."""
66+
67+ def test_shlex_quote_prevents_injection (self ):
68+ """Malicious key values must be escaped before rendering into SLURM template."""
69+ from jinja2 import Template
70+
71+ template_text = Path (__file__ ).parent .parent / "ark" / "webapp" / "slurm_template.sh"
72+ template = Template (template_text .read_text ())
73+
74+ malicious_keys = {
75+ "claude_oauth_token" : '"; rm -rf / #' ,
76+ "gemini" : "$(whoami)" ,
77+ "openai" : "key with `backticks`" ,
78+ }
79+ safe_keys = {k : shlex .quote (v ) for k , v in malicious_keys .items ()}
80+
81+ rendered = template .render (
82+ project_id = "test" ,
83+ project_dir = "/tmp/test" ,
84+ log_dir = "/tmp/test/logs" ,
85+ mode = "paper" ,
86+ max_iterations = 2 ,
87+ partition = "batch" ,
88+ account = "" ,
89+ gres = "" ,
90+ cpus_per_task = 1 ,
91+ conda_env = "ark" ,
92+ api_keys = safe_keys ,
93+ )
94+
95+ # shlex.quote wraps values in single quotes — verify safe versions appear
96+ # and dangerous unquoted patterns do not
97+ assert "CLAUDE_CODE_OAUTH_TOKEN='\" " in rendered # single-quoted, not double-quoted
98+ assert "GEMINI_API_KEY='$(whoami)'" in rendered # $(whoami) inside single quotes = safe
99+ assert "OPENAI_API_KEY='key with `backticks`'" in rendered
100+
101+ # The raw values must NOT appear in double quotes (which would allow expansion)
102+ assert 'CLAUDE_CODE_OAUTH_TOKEN="' not in rendered
103+ assert 'GEMINI_API_KEY="' not in rendered
104+ assert 'OPENAI_API_KEY="' not in rendered
105+
106+
107+ class TestDbMigration :
108+ """Tests that the DB migration adds the encrypted_keys column."""
109+
110+ def test_migrate_adds_column (self ):
111+ """Column is added to an existing table missing it."""
112+ from ark .webapp .db import _migrate
113+ from sqlalchemy import create_engine , text
114+
115+ with tempfile .TemporaryDirectory () as tmp :
116+ db_path = Path (tmp ) / "test.db"
117+ conn = sqlite3 .connect (str (db_path ))
118+ conn .execute (
119+ "CREATE TABLE user (id TEXT PRIMARY KEY, email TEXT, name TEXT)"
120+ )
121+ conn .commit ()
122+ conn .close ()
123+
124+ engine = create_engine (f"sqlite:///{ db_path } " , echo = False )
125+ try :
126+ _migrate (engine )
127+
128+ with engine .connect () as c :
129+ rows = c .execute (text ("PRAGMA table_info(user)" )).fetchall ()
130+ cols = {row [1 ] for row in rows }
131+
132+ assert "encrypted_keys" in cols
133+ finally :
134+ engine .dispose ()
135+
136+ def test_migrate_idempotent (self ):
137+ """Running migration twice does not fail."""
138+ from ark .webapp .db import _migrate
139+ from sqlalchemy import create_engine , text
140+
141+ with tempfile .TemporaryDirectory () as tmp :
142+ db_path = Path (tmp ) / "test.db"
143+ conn = sqlite3 .connect (str (db_path ))
144+ conn .execute (
145+ "CREATE TABLE user (id TEXT PRIMARY KEY, email TEXT, encrypted_keys TEXT)"
146+ )
147+ conn .commit ()
148+ conn .close ()
149+
150+ engine = create_engine (f"sqlite:///{ db_path } " , echo = False )
151+ try :
152+ _migrate (engine ) # should not raise
153+ _migrate (engine ) # second call, still should not raise
154+ finally :
155+ engine .dispose ()
0 commit comments