Skip to content

certified.cert — Certified

The Certified class is the main entry point for the library. It reads a config directory and provides:

  • SSL context creation for mTLS clients and servers
  • HTTP client context managers: sync (Client, httpx), async-httpx (AsyncClient), async-aiohttp (ClientSession)
  • A uvicorn-based HTTPS server launcher
  • Management of known clients, services, and identities

Certified

Source code in certified/cert.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
class Certified:
    def __init__(self, certified_config : Optional[Pstr] = None):
        self.config = layout.config(certified_config)

    def signer(self):
        return CA.load(self.config / "CA")

    def identity(self):
        return LeafCert.load(self.config / "id")

    def lookup_server(self, name) -> Optional[TrustedService]:
        """ Check if the server is known.

            If so, return the service's config.
        """
        server = self.config / "known_servers" / f"{name}.yaml"
        if not server.exists():
            return None
        with open(server, "r", encoding="utf-8") as f:
            cfg = yaml.safe_load(f)
        return TrustedService.model_validate(cfg)

    def get_chain_from(self, auths : List[str]) -> Tuple[bytes, bytes]:
        """ Get a certificate chain from
            any of the signers to either "id.crt" (preferred)
            or "CA.crt" -> "id.crt" (second choice).

            Returns the appropriate end-entity certificate
            along with a concatenation of all the certificates
            in the chain.
        """
        signers = set(auths)
        if (self.config / "id").is_dir():
            for fname in (self.config / "id").iterdir():
                if fname.suffix != ".crt":
                    continue
                if fname.stem in signers:
                    # ee-cert signed directly by a signer, just return it.
                    _logger.debug("Found end-entity signature from %s", fname.name)
                    return fname.read_bytes(), b""

        if (self.config / "CA").is_dir():
            for fname in (self.config / "CA").iterdir():
                if fname.suffix != ".crt":
                    continue
                if fname.stem in signers:
                    # CA was signed by signer. Return my id.crt and the CA chain.
                    crt = (self.config / "id.crt").read_bytes()
                    _logger.debug("Found CA signature from %s", fname.name)
                    return crt, fname.read_bytes()

        raise KeyError(f"Unable to find any certificate signed from any authorizers in ({signers}).")

    def ssl_context(self,
                    is_client : bool,
                    srv : Optional[TrustedService] = None
                   ) -> ssl.SSLContext:
        if srv is not None:
            assert is_client, "Must be client to use TrustedService cfg."
        ctx = ssl_context(is_client)
        if not srv or len(srv.auths) == 0:
            _logger.debug("Will authenticate using id.crt")
            self.identity().configure_cert(ctx)
        else: # lookup any signature trusted by the server
            crt, chain = self.get_chain_from(srv.auths)
            keyfile = self.config / "id.key"
            key  = Blob.read(keyfile)
            assert key.is_secret, f"{keyfile} has compromised file permissions."
            LeafCert(crt, key.bytes(), chain_to_ca=[chain]).configure_cert(ctx)

        if is_client:
            if srv is None or srv.cert is None:
                configure_capath(ctx, self.config/"known_servers")
            else:
                # Use the server's specific certificate.
                _logger.debug("Requiring specific certificate for known service at %s", srv.url)
                pem = cert_to_pem( b64_to_cert(srv.cert) )
                ctx.load_verify_locations(cadata=pem)
        else:
            configure_capath(ctx, self.config/"known_clients")
        return ctx

    def add_client(self,
                   name: Pstr,
                   cert: x509.Certificate,
                   overwrite: bool = False) -> None:
        """ Add the certificate to `known_clients`
            with the given name.
        """
        fname = self.config / "known_clients" / f"{str(name)}.crt"
        if not overwrite and fname.exists():
            raise FileExistsError(fname)
        fname.write_text(cert_to_pem(cert))

    def add_service(self,
                    name : Pstr,
                    srv  : TrustedService,
                    overwrite:bool = False) -> None:
        """ Add the certificate to `known_servers`
            with the given info.  See `TrustedService`
            for documentation on the attributes.
        """
        parse = urlparse(srv.url)
        assert parse.netloc != "", "Service URL must define a hostname!"
        assert parse.params == "", "Service URL must not use params."
        assert parse.query  == "", "Service URL must not use query."
        assert parse.fragment == "", "Service URL must not use fragment."

        fname = self.config / "known_servers" / f"{str(name)}.yaml"
        if not overwrite and fname.exists():
            raise FileExistsError(fname)
        fname.write_text( yaml.dump( srv.model_dump() ) )

    def add_identity(self,
                     signed_cert: x509.Certificate,
                     ca_cert: x509.Certificate,
                     overwrite: bool = False) -> None:
        """ Add the signed certificate to the `id/` subdirectory.
        Params:
            signed_cert: identity to add (to send to servers who recognize it)
            ca_cert: certificate that signed this identity
                     (will be used to name the signed identity)

        Returns: None

        Raises:
            ValueError: If the issuer name on the certificate does not match
                        the subject name of the issuer or the signature
                        algorithm is unsupported.
            TypeError: If the issuer does not have a supported public key
                       type.
            cryptography.exceptions.InvalidSignature: If the signature fails
                       to verify.
        """
        assert not get_is_ca(signed_cert), "Cannot use this ca certificate as a client identity."

        # check that the ca_cert actually issued this certificate
        signed_cert.verify_directly_issued_by(ca_cert)

        xname = rfc4514name(ca_cert.subject)

        (self.config / "id").mkdir(exist_ok=True, parents=True)
        fname = self.config / "id" / f"{xname}.crt"
        if not overwrite and fname.exists():
            raise FileExistsError(fname)
        fname.write_text(cert_to_pem(signed_cert))

    def lookup_public_key(self, kid: int) -> bis.PublicKey:
        # FIXME: use key serial numbers
        pub = self.signer().pubkey
        alg = cert_key_to_biscuit_alg(pub)
        raw = cert_pubkey_to_biscuit_bytes(pub)
        if alg is None or raw is None:
            raise TypeError(
                f"Key type {type(pub).__name__} is not supported "
                "for biscuit verification (supported: ed25519, secp256r1)"
            )
        return bis.PublicKey.from_bytes(raw, alg=alg)  # type: ignore

    #def biscuit(self, token : str) -> bis.Biscuit:
    #    return bis.Biscuit.from_base64(token, self.lookup_public_key)

    @classmethod
    def new(cls,
            name : x509.Name,
            san : x509.SubjectAlternativeName,
            certified_config : Optional[Pstr] = None,
            key_type : KeyType = KeyType.ed25519,
            overwrite : bool = False,
           ) -> "Certified":
        """ Create a new CA and identity certificate

        Args:
          name: the distinguished name for the signing key
          san:   subject alternate name fields for the entity certificate
          certified_config: base directory to output the new identity
          overwrite: if True, any existing files will be deleted first
        """
        ca    = CA.new(append_pseudonym(name, "Signing Certificate"), key_type=key_type)
        ident = ca.leaf_cert(name, san, key_type=key_type)

        cfg = layout.config(certified_config, False)
        if overwrite: # remove existing config!
            try:
                shutil.rmtree(cfg)
            except FileNotFoundError:
                pass
        else:
            try:
                cfg.rmdir() # only succeeds if dir. is empty
            except FileNotFoundError: # not created yet - OK
                pass
            except OSError:
                raise FileExistsError(cfg)
        cfg.mkdir(exist_ok=True, parents=True)

        ca.save(cfg / "CA", False)
        ident.save(cfg / "id", False)

        (cfg/"known_servers").mkdir()
        (cfg/"known_clients").mkdir()
        shutil.copy(cfg/"CA.crt", cfg/"known_servers"/"self.crt")
        shutil.copy(cfg/"CA.crt", cfg/"known_clients"/"self.crt")
        return cls(cfg)

    @asynccontextmanager
    async def ClientSession(self, base_url: str = "", **kws):
        """ Create an aiohttp.ClientSession context
            that includes the current identity within
            its ssl context (connector=...).
        """
        assert aiohttp is not None, "aiohttp is not available."

        # Check whether this server corresponds to a known host.
        url = urlparse(base_url)
        assert url.port is None \
                or url.netloc == f"{url.hostname}:{url.port}", "URL's netloc must define only a hostname and port."

        srv = self.lookup_server(url.hostname)
        if srv:
            url = replace_baseurl(url, srv.url)

        new_base = urlunparse(url)
        if srv:
            _logger.debug("Replaced %s with %s", base_url, new_base)

        ssl_ctx = self.ssl_context(True, srv)
        #conn = aiohttp.TCPConnector(ssl_context=ssl_ctx)
        conn = aiohttp.TCPConnector(ssl=ssl_ctx)
        async with aiohttp.ClientSession(base_url = new_base,
                                         connector = conn,
                                         **kws) as client:
            yield client

    @contextmanager
    def Client(self, base_url : str = "", headers : Dict[str,str] = {}):
        """ Create an httpx.Client context
            that includes the current identity within
            its ssl context.

            Use this for synchronous code. For async code see
            [`AsyncClient`][certified.cert.Certified.AsyncClient] (httpx) or
            [`ClientSession`][certified.cert.Certified.ClientSession] (aiohttp).
        """
        assert httpx is not None, "httpx is not available."

        # Check whether this server corresponds to
        # a known host.
        url = urlparse(base_url)
        assert url.port is None \
                or url.netloc == f"{url.hostname}:{url.port}", "URL's netloc must define only a hostname and port."

        srv = self.lookup_server(url.hostname)
        if srv:
            url = replace_baseurl(url, srv.url)

        new_base = urlunparse(url)
        if srv:
            _logger.debug("Replaced %s with %s", base_url, new_base)

        ssl_ctx = self.ssl_context(True, srv)
        with httpx.Client(base_url = new_base,
                          headers = headers,
                          verify = ssl_ctx) as client:
            yield client

    @asynccontextmanager
    async def AsyncClient(self, base_url : str = "", headers : Dict[str,str] = {}):
        """ Create an httpx.AsyncClient context
            that includes the current identity within
            its ssl context.

            Use this for async code when you need an `httpx.AsyncClient` —
            for example when integrating with frameworks such as A2A, LangChain,
            or any library that accepts `httpx.AsyncClient` directly.

            For aiohttp-based async code (e.g. existing tests or WebSocket
            support) use [`ClientSession`][certified.cert.Certified.ClientSession]
            instead.

            Example:
            ```python
            cert = Certified()
            async with cert.AsyncClient("https://my-api:8443") as http:
                r = await http.get("/notes")
                r.raise_for_status()
                print(r.json())
            ```
        """
        assert httpx is not None, "httpx is not available."

        url = urlparse(base_url)
        assert url.port is None \
                or url.netloc == f"{url.hostname}:{url.port}", "URL's netloc must define only a hostname and port."

        srv = self.lookup_server(url.hostname)
        if srv:
            url = replace_baseurl(url, srv.url)

        new_base = urlunparse(url)
        if srv:
            _logger.debug("Replaced %s with %s", base_url, new_base)

        ssl_ctx = self.ssl_context(True, srv)
        async with httpx.AsyncClient(base_url = new_base,
                                     headers = headers,
                                     verify = ssl_ctx) as client:
            yield client

    def serve(self,
              app : Any,
              url_str : str,
              loki : Optional[Pstr] = None,
              get_passwd : PWCallback = None) -> None:
        cfg = self.config
        url = urlparse(url_str)

        if url.scheme == "https":
            assert url.hostname is not None, "URL must define a hostname."
            assert url.port is not None, "URL must define a port."
            assert url.netloc == f"{url.hostname}:{url.port}", "URL's netloc must define only a hostname and port."
            assert url.path == "", "Cannot serve a sub-path."
            assert url.params == "", "Cannot handle URL parameters."
            assert url.query == "", "Cannot serve specific query."
            assert url.fragment == "", "Cannot serve specific fragment"

            assert uvicorn is not None, "uvicorn is not available."

            config = uvicorn.Config(
                        app,
                        host = url.hostname,
                        port = url.port,
                        log_level = "info",
                        #ssl_cert_reqs = ssl.VerifyMode.CERT_REQUIRED,
                        #ssl_ca_certs  = cfg/"known_clients", # type: ignore[arg-type]
                        #ssl_certfile  = cfg/"id.crt",
                        #ssl_keyfile   = cfg/"id.key", # type: ignore[arg-type]
                        #ssl_keyfile_password = get_passwd, # type: ignore[arg-type]
                        http = "h11")


            config.load() # https://github.com/encode/uvicorn/discussions/2339
            _logger.debug("Using Certified's custom ssl context.")
            config.ssl = self.ssl_context(False)

            capture_logs(str(app), loki)
            server = uvicorn.Server(config)
            asyncio.run( server.serve() )
        else:
            raise ValueError(f"Unsupported URL scheme: {url.scheme}")

AsyncClient(base_url='', headers={}) async

Create an httpx.AsyncClient context that includes the current identity within its ssl context.

Use this for async code when you need an httpx.AsyncClient — for example when integrating with frameworks such as A2A, LangChain, or any library that accepts httpx.AsyncClient directly.

For aiohttp-based async code (e.g. existing tests or WebSocket support) use ClientSession instead.

Example:

cert = Certified()
async with cert.AsyncClient("https://my-api:8443") as http:
    r = await http.get("/notes")
    r.raise_for_status()
    print(r.json())
Source code in certified/cert.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
@asynccontextmanager
async def AsyncClient(self, base_url : str = "", headers : Dict[str,str] = {}):
    """ Create an httpx.AsyncClient context
        that includes the current identity within
        its ssl context.

        Use this for async code when you need an `httpx.AsyncClient` —
        for example when integrating with frameworks such as A2A, LangChain,
        or any library that accepts `httpx.AsyncClient` directly.

        For aiohttp-based async code (e.g. existing tests or WebSocket
        support) use [`ClientSession`][certified.cert.Certified.ClientSession]
        instead.

        Example:
        ```python
        cert = Certified()
        async with cert.AsyncClient("https://my-api:8443") as http:
            r = await http.get("/notes")
            r.raise_for_status()
            print(r.json())
        ```
    """
    assert httpx is not None, "httpx is not available."

    url = urlparse(base_url)
    assert url.port is None \
            or url.netloc == f"{url.hostname}:{url.port}", "URL's netloc must define only a hostname and port."

    srv = self.lookup_server(url.hostname)
    if srv:
        url = replace_baseurl(url, srv.url)

    new_base = urlunparse(url)
    if srv:
        _logger.debug("Replaced %s with %s", base_url, new_base)

    ssl_ctx = self.ssl_context(True, srv)
    async with httpx.AsyncClient(base_url = new_base,
                                 headers = headers,
                                 verify = ssl_ctx) as client:
        yield client

Client(base_url='', headers={})

Create an httpx.Client context that includes the current identity within its ssl context.

Use this for synchronous code. For async code see AsyncClient (httpx) or ClientSession (aiohttp).

Source code in certified/cert.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@contextmanager
def Client(self, base_url : str = "", headers : Dict[str,str] = {}):
    """ Create an httpx.Client context
        that includes the current identity within
        its ssl context.

        Use this for synchronous code. For async code see
        [`AsyncClient`][certified.cert.Certified.AsyncClient] (httpx) or
        [`ClientSession`][certified.cert.Certified.ClientSession] (aiohttp).
    """
    assert httpx is not None, "httpx is not available."

    # Check whether this server corresponds to
    # a known host.
    url = urlparse(base_url)
    assert url.port is None \
            or url.netloc == f"{url.hostname}:{url.port}", "URL's netloc must define only a hostname and port."

    srv = self.lookup_server(url.hostname)
    if srv:
        url = replace_baseurl(url, srv.url)

    new_base = urlunparse(url)
    if srv:
        _logger.debug("Replaced %s with %s", base_url, new_base)

    ssl_ctx = self.ssl_context(True, srv)
    with httpx.Client(base_url = new_base,
                      headers = headers,
                      verify = ssl_ctx) as client:
        yield client

ClientSession(base_url='', **kws) async

Create an aiohttp.ClientSession context that includes the current identity within its ssl context (connector=...).

Source code in certified/cert.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
@asynccontextmanager
async def ClientSession(self, base_url: str = "", **kws):
    """ Create an aiohttp.ClientSession context
        that includes the current identity within
        its ssl context (connector=...).
    """
    assert aiohttp is not None, "aiohttp is not available."

    # Check whether this server corresponds to a known host.
    url = urlparse(base_url)
    assert url.port is None \
            or url.netloc == f"{url.hostname}:{url.port}", "URL's netloc must define only a hostname and port."

    srv = self.lookup_server(url.hostname)
    if srv:
        url = replace_baseurl(url, srv.url)

    new_base = urlunparse(url)
    if srv:
        _logger.debug("Replaced %s with %s", base_url, new_base)

    ssl_ctx = self.ssl_context(True, srv)
    #conn = aiohttp.TCPConnector(ssl_context=ssl_ctx)
    conn = aiohttp.TCPConnector(ssl=ssl_ctx)
    async with aiohttp.ClientSession(base_url = new_base,
                                     connector = conn,
                                     **kws) as client:
        yield client

add_client(name, cert, overwrite=False)

Add the certificate to known_clients with the given name.

Source code in certified/cert.py
173
174
175
176
177
178
179
180
181
182
183
def add_client(self,
               name: Pstr,
               cert: x509.Certificate,
               overwrite: bool = False) -> None:
    """ Add the certificate to `known_clients`
        with the given name.
    """
    fname = self.config / "known_clients" / f"{str(name)}.crt"
    if not overwrite and fname.exists():
        raise FileExistsError(fname)
    fname.write_text(cert_to_pem(cert))

add_identity(signed_cert, ca_cert, overwrite=False)

Add the signed certificate to the id/ subdirectory. Params: signed_cert: identity to add (to send to servers who recognize it) ca_cert: certificate that signed this identity (will be used to name the signed identity)

Returns: None

Raises:

Type Description
ValueError

If the issuer name on the certificate does not match the subject name of the issuer or the signature algorithm is unsupported.

TypeError

If the issuer does not have a supported public key type.

InvalidSignature

If the signature fails to verify.

Source code in certified/cert.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def add_identity(self,
                 signed_cert: x509.Certificate,
                 ca_cert: x509.Certificate,
                 overwrite: bool = False) -> None:
    """ Add the signed certificate to the `id/` subdirectory.
    Params:
        signed_cert: identity to add (to send to servers who recognize it)
        ca_cert: certificate that signed this identity
                 (will be used to name the signed identity)

    Returns: None

    Raises:
        ValueError: If the issuer name on the certificate does not match
                    the subject name of the issuer or the signature
                    algorithm is unsupported.
        TypeError: If the issuer does not have a supported public key
                   type.
        cryptography.exceptions.InvalidSignature: If the signature fails
                   to verify.
    """
    assert not get_is_ca(signed_cert), "Cannot use this ca certificate as a client identity."

    # check that the ca_cert actually issued this certificate
    signed_cert.verify_directly_issued_by(ca_cert)

    xname = rfc4514name(ca_cert.subject)

    (self.config / "id").mkdir(exist_ok=True, parents=True)
    fname = self.config / "id" / f"{xname}.crt"
    if not overwrite and fname.exists():
        raise FileExistsError(fname)
    fname.write_text(cert_to_pem(signed_cert))

add_service(name, srv, overwrite=False)

Add the certificate to known_servers with the given info. See TrustedService for documentation on the attributes.

Source code in certified/cert.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def add_service(self,
                name : Pstr,
                srv  : TrustedService,
                overwrite:bool = False) -> None:
    """ Add the certificate to `known_servers`
        with the given info.  See `TrustedService`
        for documentation on the attributes.
    """
    parse = urlparse(srv.url)
    assert parse.netloc != "", "Service URL must define a hostname!"
    assert parse.params == "", "Service URL must not use params."
    assert parse.query  == "", "Service URL must not use query."
    assert parse.fragment == "", "Service URL must not use fragment."

    fname = self.config / "known_servers" / f"{str(name)}.yaml"
    if not overwrite and fname.exists():
        raise FileExistsError(fname)
    fname.write_text( yaml.dump( srv.model_dump() ) )

get_chain_from(auths)

Get a certificate chain from any of the signers to either "id.crt" (preferred) or "CA.crt" -> "id.crt" (second choice).

Returns the appropriate end-entity certificate along with a concatenation of all the certificates in the chain.

Source code in certified/cert.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def get_chain_from(self, auths : List[str]) -> Tuple[bytes, bytes]:
    """ Get a certificate chain from
        any of the signers to either "id.crt" (preferred)
        or "CA.crt" -> "id.crt" (second choice).

        Returns the appropriate end-entity certificate
        along with a concatenation of all the certificates
        in the chain.
    """
    signers = set(auths)
    if (self.config / "id").is_dir():
        for fname in (self.config / "id").iterdir():
            if fname.suffix != ".crt":
                continue
            if fname.stem in signers:
                # ee-cert signed directly by a signer, just return it.
                _logger.debug("Found end-entity signature from %s", fname.name)
                return fname.read_bytes(), b""

    if (self.config / "CA").is_dir():
        for fname in (self.config / "CA").iterdir():
            if fname.suffix != ".crt":
                continue
            if fname.stem in signers:
                # CA was signed by signer. Return my id.crt and the CA chain.
                crt = (self.config / "id.crt").read_bytes()
                _logger.debug("Found CA signature from %s", fname.name)
                return crt, fname.read_bytes()

    raise KeyError(f"Unable to find any certificate signed from any authorizers in ({signers}).")

lookup_server(name)

Check if the server is known.

If so, return the service's config.

Source code in certified/cert.py
101
102
103
104
105
106
107
108
109
110
111
def lookup_server(self, name) -> Optional[TrustedService]:
    """ Check if the server is known.

        If so, return the service's config.
    """
    server = self.config / "known_servers" / f"{name}.yaml"
    if not server.exists():
        return None
    with open(server, "r", encoding="utf-8") as f:
        cfg = yaml.safe_load(f)
    return TrustedService.model_validate(cfg)

new(name, san, certified_config=None, key_type=KeyType.ed25519, overwrite=False) classmethod

Create a new CA and identity certificate

Parameters:

Name Type Description Default
name Name

the distinguished name for the signing key

required
san SubjectAlternativeName

subject alternate name fields for the entity certificate

required
certified_config Optional[Pstr]

base directory to output the new identity

None
overwrite bool

if True, any existing files will be deleted first

False
Source code in certified/cert.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
@classmethod
def new(cls,
        name : x509.Name,
        san : x509.SubjectAlternativeName,
        certified_config : Optional[Pstr] = None,
        key_type : KeyType = KeyType.ed25519,
        overwrite : bool = False,
       ) -> "Certified":
    """ Create a new CA and identity certificate

    Args:
      name: the distinguished name for the signing key
      san:   subject alternate name fields for the entity certificate
      certified_config: base directory to output the new identity
      overwrite: if True, any existing files will be deleted first
    """
    ca    = CA.new(append_pseudonym(name, "Signing Certificate"), key_type=key_type)
    ident = ca.leaf_cert(name, san, key_type=key_type)

    cfg = layout.config(certified_config, False)
    if overwrite: # remove existing config!
        try:
            shutil.rmtree(cfg)
        except FileNotFoundError:
            pass
    else:
        try:
            cfg.rmdir() # only succeeds if dir. is empty
        except FileNotFoundError: # not created yet - OK
            pass
        except OSError:
            raise FileExistsError(cfg)
    cfg.mkdir(exist_ok=True, parents=True)

    ca.save(cfg / "CA", False)
    ident.save(cfg / "id", False)

    (cfg/"known_servers").mkdir()
    (cfg/"known_clients").mkdir()
    shutil.copy(cfg/"CA.crt", cfg/"known_servers"/"self.crt")
    shutil.copy(cfg/"CA.crt", cfg/"known_clients"/"self.crt")
    return cls(cfg)

replace_baseurl

Replace url's netloc with new_base, and prepend the path from new_base to url.path.

Leaves all other parts of url unchanged.

Source code in certified/cert.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def replace_baseurl(url : URL, new_base : str) -> URL:
    """ Replace url's netloc with new_base,
        and prepend the path from new_base to url.path.

        Leaves all other parts of url unchanged.
    """
    new = urlparse(new_base)
    assert new.scheme == "" or new.scheme == url.scheme, "Cannot change protocol scheme."
    assert new.params   == "", "URL must not contain params"
    assert new.query    == "", "URL must not contain query"
    assert new.fragment == "", "URL must not contain fragment"

    # find host:port combination.
    host = new.hostname
    assert host is not None
    port = new.port
    if port is None:
        assert new.netloc == new.hostname, "URL's netloc must define only a hostname and optional port."
        port = url.port # keep any url port, if defined
    else:
        assert new.netloc == f"{new.hostname}:{new.port}", "URL's netloc must define only a hostname and optional port."
        assert url.port is None, "Cannot define port in both the server's url field and the server name."

    # join the two paths with new at the left.
    if new.path == "":
        new_path = url.path
    else:
        new_path = str(PurePosixPath(new.path) / url.path.lstrip("/"))
    return URL(scheme   = url.scheme,
               netloc   = f"{host}:{port}" if port else host,
               path     = new_path,
               params   = url.params,
               query    = url.query,
               fragment = url.fragment)