1717from cryptography .hazmat .primitives import hashes , serialization
1818from cryptography .hazmat .primitives .asymmetric import rsa
1919from cryptography .hazmat .primitives .asymmetric .rsa import RSAPrivateKey
20- from cryptography .hazmat .primitives .serialization import load_pem_private_key , pkcs12
20+ from cryptography .hazmat .primitives .serialization import load_pem_private_key
2121from cryptography .x509 import Certificate
2222from cryptography .x509 .oid import NameOID
2323from platformdirs import user_cache_dir
@@ -29,25 +29,20 @@ class CertKey:
2929 cert : Certificate
3030 key : RSAPrivateKey
3131
32- def cert_bundle_as_pfx (self , password : bytes | None = None ) -> bytes :
33- if password is None :
34- password = b""
35-
36- return pkcs12 .serialize_key_and_certificates (
37- name = b"certificate" ,
38- key = self .key ,
39- cert = self .cert ,
40- cas = None ,
41- encryption_algorithm = serialization .BestAvailableEncryption (password )
42- if password
43- else serialization .NoEncryption (),
44- )
32+ def get_root_ca (self ) -> CertKey :
33+ if self .ca is None :
34+ return self
35+ return self .ca .get_root_ca ()
4536
4637 def cert_bundle_as_pem (self ):
4738 bundle = []
4839 bundle .append (self .cert .public_bytes (encoding = serialization .Encoding .PEM ).decode ("utf-8" ))
49- if self .ca is not None :
50- bundle .append (self .ca .cert_bundle_as_pem ())
40+ ca = self .ca
41+ while ca is not None :
42+ # We only append this CA cert if it isn't the root
43+ if ca .ca is not None :
44+ bundle .append (self .ca .cert_as_pem ())
45+ ca = ca .ca
5146 return "" .join (bundle )
5247
5348 def cert_as_pem (self ):
@@ -76,11 +71,10 @@ def to_json_mapping(self) -> dict[str, Any]:
7671 }
7772
7873
79- def get_ca (name , root_ca = None ) -> CertKey :
74+ def get_ca (name , issuing_ca = None ) -> CertKey :
8075 ca_filename = Path (user_cache_dir ("pytest-ess" , "element" )) / Path (name .lower ().replace (" " , "-" ))
8176 cert_path = ca_filename .with_suffix (".crt" )
8277 key_path = ca_filename .with_suffix (".key" )
83- bundle_path = (ca_filename .parent / (ca_filename .name + "-bundle" )).with_suffix (".pem" )
8478 if not ca_filename .parent .exists ():
8579 os .makedirs (ca_filename .parent , exist_ok = True )
8680 certkey = None
@@ -93,19 +87,25 @@ def get_ca(name, root_ca=None) -> CertKey:
9387 with open (cert_path , "rb" ) as pem_in :
9488 cert = x509 .load_pem_x509_certificate (pem_in .read (), default_backend ())
9589 if cert .not_valid_after_utc > pytz .UTC .localize (datetime .datetime .now ()):
96- certkey = CertKey (ca = root_ca , cert = cert , key = private_key )
90+ certkey = CertKey (ca = issuing_ca , cert = cert , key = private_key )
91+
9792 if not certkey :
98- certkey = generate_ca (name , root_ca )
93+ certkey = generate_ca (name , issuing_ca )
9994 with open (key_path , "wb" ) as pem_out :
10095 pem_out .write (certkey .key_as_pem ().encode ("utf-8" ))
10196 with open (cert_path , "wb" ) as pem_out :
10297 pem_out .write (certkey .cert_as_pem ().encode ("utf-8" ))
103- with open (bundle_path , "wb" ) as pem_out :
104- pem_out .write (certkey .cert_bundle_as_pem ().encode ("utf-8" ))
98+
99+ # Remove unused bundle - given we should only need to trust the root CA, that the tests will construct the
100+ # bundle appropriate for ingresses, and that a bundle of CA certs wasn't super useful this file was unneeded
101+ bundle_path = (ca_filename .parent / (ca_filename .name + "-bundle" )).with_suffix (".pem" )
102+ if bundle_path .exists ():
103+ bundle_path .unlink ()
104+
105105 return certkey
106106
107107
108- def generate_ca (name , root_ca = None ) -> CertKey :
108+ def generate_ca (name , issuing_ca = None ) -> CertKey :
109109 two_days = datetime .timedelta (2 , 0 , 0 )
110110 three_months = datetime .timedelta (90 , 0 , 0 )
111111 private_key = rsa .generate_private_key (public_exponent = 65537 , key_size = 2048 , backend = default_backend ())
@@ -120,8 +120,8 @@ def generate_ca(name, root_ca=None) -> CertKey:
120120 ]
121121 )
122122 )
123- if root_ca :
124- builder = builder .issuer_name (root_ca .cert .subject )
123+ if issuing_ca :
124+ builder = builder .issuer_name (issuing_ca .cert .subject )
125125 else :
126126 builder = builder .issuer_name (
127127 x509 .Name (
@@ -145,12 +145,12 @@ def generate_ca(name, root_ca=None) -> CertKey:
145145 critical = True ,
146146 )
147147 builder = builder .add_extension (x509 .SubjectKeyIdentifier .from_public_key (public_key ), critical = False )
148- if root_ca :
148+ if issuing_ca :
149149 builder = builder .add_extension (
150- x509 .AuthorityKeyIdentifier .from_issuer_public_key (root_ca .cert .public_key ()), critical = False
150+ x509 .AuthorityKeyIdentifier .from_issuer_public_key (issuing_ca .cert .public_key ()), critical = False
151151 )
152- certificate = builder .sign (root_ca .key , hashes .SHA256 (), default_backend ())
153- ca = CertKey (ca = root_ca , cert = certificate , key = private_key )
152+ certificate = builder .sign (issuing_ca .key , hashes .SHA256 (), default_backend ())
153+ ca = CertKey (ca = issuing_ca , cert = certificate , key = private_key )
154154 else :
155155 builder = builder .add_extension (x509 .AuthorityKeyIdentifier .from_issuer_public_key (public_key ), critical = False )
156156 certificate = builder .sign (private_key , hashes .SHA256 (), default_backend ())
0 commit comments