@@ -27,20 +27,63 @@ def _cluster_available(kubeconfig=""):
2727
2828SCRIPT = "provider-kubeconfig.py"
2929
30+ # All CLI elements that must appear in --help
31+ HELP_ELEMENTS = [
32+ "create" , "delete" , "update" , "extract" ,
33+ "namespace" ,
34+ "-k" , "--kubeconfig" ,
35+ "-s" , "--apiserverurl" ,
36+ "-f" , "--filename" ,
37+ "-x" , "--clustername" ,
38+ "-p" , "--permissionfile" ,
39+ "-c" , "--consumer" ,
40+ ]
41+
3042
3143class TestCli (unittest .TestCase ):
3244 """provider-kubeconfig.py exposes expected CLI (actions and flags)."""
3345
34- def test_help_shows_actions_and_flags (self ):
46+ def test_help_shows_all_actions (self ):
47+ root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
48+ proc = subprocess .run (
49+ [sys .executable , os .path .join (root , SCRIPT ), "--help" ],
50+ capture_output = True , text = True , cwd = root ,
51+ )
52+ self .assertEqual (proc .returncode , 0 , proc .stderr )
53+ out = proc .stdout or ""
54+ for elem in ["create" , "delete" , "update" , "extract" ]:
55+ self .assertIn (elem , out , f"Action { elem } should appear in help" )
56+
57+ def test_help_shows_all_flags (self ):
58+ root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
59+ proc = subprocess .run (
60+ [sys .executable , os .path .join (root , SCRIPT ), "--help" ],
61+ capture_output = True , text = True , cwd = root ,
62+ )
63+ self .assertEqual (proc .returncode , 0 )
64+ out = proc .stdout or ""
65+ for elem in HELP_ELEMENTS :
66+ self .assertIn (elem , out , f"Help should mention { elem } " )
67+
68+ def test_help_shows_namespace_argument (self ):
3569 root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
3670 proc = subprocess .run (
3771 [sys .executable , os .path .join (root , SCRIPT ), "--help" ],
3872 capture_output = True , text = True , cwd = root ,
3973 )
4074 self .assertEqual (proc .returncode , 0 )
4175 out = proc .stdout or ""
42- for x in ["create" , "delete" , "update" , "extract" , "-k" , "-s" , "-c" , "namespace" ]:
43- self .assertIn (x , out )
76+ self .assertIn ("namespace" , out .lower ())
77+
78+ def test_update_without_permissionfile_exits_with_error (self ):
79+ """update action without -p exits with code 1."""
80+ root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
81+ proc = subprocess .run (
82+ [sys .executable , os .path .join (root , SCRIPT ), "update" , "default" ],
83+ capture_output = True , text = True , cwd = root ,
84+ )
85+ self .assertNotEqual (proc .returncode , 0 )
86+ self .assertIn ("permission" , (proc .stdout or proc .stderr or "" ).lower ())
4487
4588
4689class TestKubeconfigIntegration (unittest .TestCase ):
@@ -58,7 +101,7 @@ def setUpClass(cls):
58101 cls .has_cluster = _cluster_available (cls .kubeconfig )
59102 cls .kubeconfig_flag = " --kubeconfig=" + cls .kubeconfig if cls .kubeconfig else ""
60103
61- def _create_and_get_kubeconfig (self , root , ns , sa = "kubeplus-saas-provider" , extra_args = None ):
104+ def _create_and_get_kubeconfig (self , root , ns , sa = "kubeplus-saas-provider" , extra_args = None , output_filename = None ):
62105 """Run create, return (kubeconfig_dict, proc). Caller must delete to cleanup."""
63106 create_args = ["create" , ns ]
64107 if sa != "kubeplus-saas-provider" :
@@ -71,7 +114,9 @@ def _create_and_get_kubeconfig(self, root, ns, sa="kubeplus-saas-provider", extr
71114 )
72115 if proc .returncode != 0 :
73116 return None , proc
74- filename = sa + ".json"
117+ filename = output_filename or (sa + ".json" )
118+ if not filename .endswith (".json" ):
119+ filename += ".json"
75120 kubeconfig_path = os .path .join (root , filename )
76121 if not os .path .exists (kubeconfig_path ):
77122 return None , proc
@@ -88,44 +133,82 @@ def _delete_for_cleanup(self, root, ns, sa="kubeplus-saas-provider"):
88133 cwd = root , capture_output = True , timeout = 60 ,
89134 )
90135
91- def _assert_kubeconfig_valid (self , cfg , expected_server = None , expected_cluster_name = None , expected_namespace = None ):
92- self .assertIsNotNone (cfg )
93- self .assertEqual (cfg .get ("apiVersion" ), "v1" )
94- self .assertEqual (cfg .get ("kind" ), "Config" )
136+ def _assert_kubeconfig_valid (
137+ self ,
138+ cfg ,
139+ expected_server = None ,
140+ expected_cluster_name = None ,
141+ expected_namespace = None ,
142+ expected_user_name = None ,
143+ ):
144+ """Assert all kubeconfig fields are non-empty and optionally match expected values."""
145+ self .assertIsNotNone (cfg , "kubeconfig should not be None" )
146+
147+ # Top-level
148+ self .assertEqual (cfg .get ("apiVersion" ), "v1" , "apiVersion should be v1" )
149+ self .assertEqual (cfg .get ("kind" ), "Config" , "kind should be Config" )
150+ self .assertTrue (cfg .get ("current-context" ), "current-context should be non-empty" )
151+
152+ # Users
95153 self .assertTrue (cfg .get ("users" ), "users should be non-empty" )
154+ user = cfg ["users" ][0 ]
155+ self .assertTrue (user .get ("name" ), "users[0].name should be non-empty" )
156+ self .assertTrue (user .get ("user" ), "users[0].user should be non-empty" )
157+ token = user .get ("user" , {}).get ("token" )
158+ self .assertTrue (token , "users[0].user.token should be non-empty" )
159+ if expected_user_name :
160+ self .assertEqual (user .get ("name" ), expected_user_name )
161+
162+ # Clusters
96163 self .assertTrue (cfg .get ("clusters" ), "clusters should be non-empty" )
97- self .assertTrue (cfg .get ("contexts" ), "contexts should be non-empty" )
98- token = cfg ["users" ][0 ].get ("user" , {}).get ("token" )
99- self .assertTrue (token , "token should be non-empty" )
100- server = cfg ["clusters" ][0 ].get ("cluster" , {}).get ("server" )
101- self .assertTrue (server , "cluster server should be non-empty" )
164+ cluster_entry = cfg ["clusters" ][0 ]
165+ self .assertTrue (cluster_entry .get ("name" ), "clusters[0].name should be non-empty" )
166+ self .assertTrue (cluster_entry .get ("cluster" ), "clusters[0].cluster should be non-empty" )
167+ cluster = cluster_entry ["cluster" ]
168+ self .assertTrue (cluster .get ("server" ), "clusters[0].cluster.server should be non-empty" )
169+ self .assertIn ("insecure-skip-tls-verify" , cluster , "cluster should have insecure-skip-tls-verify" )
102170 if expected_server :
103- self .assertEqual (server , expected_server )
171+ self .assertEqual (cluster .get ("server" ), expected_server )
172+
173+ # Contexts
174+ self .assertTrue (cfg .get ("contexts" ), "contexts should be non-empty" )
175+ ctx_entry = cfg ["contexts" ][0 ]
176+ self .assertTrue (ctx_entry .get ("name" ), "contexts[0].name should be non-empty" )
177+ self .assertTrue (ctx_entry .get ("context" ), "contexts[0].context should be non-empty" )
178+ ctx = ctx_entry ["context" ]
179+ self .assertTrue (ctx .get ("cluster" ), "contexts[0].context.cluster should be non-empty" )
180+ self .assertTrue (ctx .get ("user" ), "contexts[0].context.user should be non-empty" )
181+ self .assertTrue (ctx .get ("namespace" ), "contexts[0].context.namespace should be non-empty" )
104182 if expected_cluster_name :
105- ctx_name = cfg .get ("current-context" ) or ( cfg [ "contexts" ][ 0 ]. get ( "name" ) if cfg [ "contexts" ] else "" )
106- self .assertEqual (ctx_name , expected_cluster_name )
183+ self . assertEqual ( cfg .get ("current-context" ), expected_cluster_name )
184+ self .assertEqual (ctx_entry . get ( "name" ) , expected_cluster_name )
107185 if expected_namespace :
108- ctx_ns = cfg ["contexts" ][0 ].get ("context" , {}).get ("namespace" , "" )
109- self .assertEqual (ctx_ns , expected_namespace )
186+ self .assertEqual (ctx .get ("namespace" ), expected_namespace )
110187
111188 def _sa_exists (self , sa , ns ):
112189 out , err = _run_command ("kubectl get sa " + sa + " -n " + ns + self .kubeconfig_flag )
113190 return out and sa in out and "NotFound" not in (err or "" )
114191
115- def test_provider_kubeconfig_has_nonempty_fields (self ):
192+ def test_provider_kubeconfig_all_fields_nonempty (self ):
193+ """Provider kubeconfig: every field that should exist is non-empty."""
116194 if not self .has_cluster :
117195 self .skipTest ("No cluster reachable (set KUBECONFIG)" )
118196 root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
119197 ns = "kubeplus-test-prov-" + str (os .getpid ())
120198 try :
121199 cfg , proc = self ._create_and_get_kubeconfig (root , ns )
122200 self .assertEqual (proc .returncode , 0 , proc .stderr )
123- self ._assert_kubeconfig_valid (cfg , expected_namespace = ns )
201+ self ._assert_kubeconfig_valid (
202+ cfg ,
203+ expected_namespace = ns ,
204+ expected_user_name = "kubeplus-saas-provider" ,
205+ )
124206 self .assertTrue (self ._sa_exists ("kubeplus-saas-provider" , ns ))
125207 finally :
126208 self ._delete_for_cleanup (root , ns )
127209
128- def test_consumer_kubeconfig_has_namespace_in_context (self ):
210+ def test_consumer_kubeconfig_all_fields_nonempty (self ):
211+ """Consumer kubeconfig: every field non-empty, user name matches SA, namespace in context."""
129212 if not self .has_cluster :
130213 self .skipTest ("No cluster reachable (set KUBECONFIG)" )
131214 root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
@@ -134,16 +217,78 @@ def test_consumer_kubeconfig_has_namespace_in_context(self):
134217 try :
135218 cfg , proc = self ._create_and_get_kubeconfig (root , ns , sa = consumer_sa )
136219 self .assertEqual (proc .returncode , 0 , proc .stderr )
137- self ._assert_kubeconfig_valid (cfg , expected_namespace = ns )
220+ self ._assert_kubeconfig_valid (
221+ cfg ,
222+ expected_namespace = ns ,
223+ expected_user_name = consumer_sa ,
224+ )
138225 self .assertTrue (self ._sa_exists (consumer_sa , ns ))
139226 finally :
140227 self ._delete_for_cleanup (root , ns , sa = consumer_sa )
141228
142- def test_cli_flags_reflected_in_kubeconfig (self ):
229+ def test_flag_s_apiserverurl_reflected_in_kubeconfig (self ):
230+ """-s/--apiserverurl sets cluster server in kubeconfig."""
231+ if not self .has_cluster :
232+ self .skipTest ("No cluster reachable (set KUBECONFIG)" )
233+ root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
234+ ns = "kubeplus-test-s-" + str (os .getpid ())
235+ test_server = "https://api.example.com:6443"
236+ try :
237+ cfg , proc = self ._create_and_get_kubeconfig (
238+ root , ns ,
239+ extra_args = ["-s" , test_server ],
240+ )
241+ self .assertEqual (proc .returncode , 0 , proc .stderr )
242+ self ._assert_kubeconfig_valid (cfg , expected_server = test_server , expected_namespace = ns )
243+ finally :
244+ self ._delete_for_cleanup (root , ns )
245+
246+ def test_flag_x_clustername_reflected_in_kubeconfig (self ):
247+ """-x/--clustername sets context name and cluster name in kubeconfig."""
248+ if not self .has_cluster :
249+ self .skipTest ("No cluster reachable (set KUBECONFIG)" )
250+ root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
251+ ns = "kubeplus-test-x-" + str (os .getpid ())
252+ test_cluster = "my-test-cluster"
253+ try :
254+ cfg , proc = self ._create_and_get_kubeconfig (
255+ root , ns ,
256+ extra_args = ["-x" , test_cluster ],
257+ )
258+ self .assertEqual (proc .returncode , 0 , proc .stderr )
259+ self ._assert_kubeconfig_valid (
260+ cfg ,
261+ expected_cluster_name = test_cluster ,
262+ expected_namespace = ns ,
263+ )
264+ finally :
265+ self ._delete_for_cleanup (root , ns )
266+
267+ def test_flag_f_filename_uses_custom_output_file (self ):
268+ """-f/--filename writes kubeconfig to specified file."""
269+ if not self .has_cluster :
270+ self .skipTest ("No cluster reachable (set KUBECONFIG)" )
271+ root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
272+ ns = "kubeplus-test-f-" + str (os .getpid ())
273+ custom_name = "custom-provider-kubeconfig"
274+ try :
275+ cfg , proc = self ._create_and_get_kubeconfig (
276+ root , ns ,
277+ extra_args = ["-f" , custom_name ],
278+ output_filename = custom_name ,
279+ )
280+ self .assertEqual (proc .returncode , 0 , proc .stderr )
281+ self ._assert_kubeconfig_valid (cfg , expected_namespace = ns )
282+ self .assertTrue (os .path .exists (os .path .join (root , custom_name + ".json" )))
283+ finally :
284+ self ._delete_for_cleanup (root , ns )
285+
286+ def test_flags_s_and_x_combined (self ):
287+ """-s and -x together: both server and cluster name in kubeconfig."""
143288 if not self .has_cluster :
144289 self .skipTest ("No cluster reachable (set KUBECONFIG)" )
145290 root = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
146- ns = "kubeplus-test-flags -" + str (os .getpid ())
291+ ns = "kubeplus-test-sx -" + str (os .getpid ())
147292 test_server = "https://api.example.com:6443"
148293 test_cluster = "my-test-cluster"
149294 try :
0 commit comments