Skip to content

certified.blob — Blob: Safe PEM File I/O

Blob is a thin wrapper around bytes that tracks whether the data is secret (private key) or public (certificate), and enforces appropriate file permissions (0o600 for secrets, 0o644 for public data) on write.

Blob

A convenience wrapper for a blob of bytes, mostly representing PEM-encoded data.

Parameters:

Name Type Description Default
data bytes

the PEM-encoded data.

required
is_secret bool

True for private-key material (writes with mode 0o600), False for public data (0o644).

required
Source code in certified/blob.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class Blob:
    """A convenience wrapper for a blob of bytes, mostly
    representing PEM-encoded data.

    Args:
      data: the PEM-encoded data.
      is_secret: True for private-key material (writes with mode 0o600), False for public data (0o644).
    """

    def __init__(self, data: bytes, is_secret : bool) -> None:
        self._data = data
        self.is_secret = is_secret

    @classmethod
    def read(cls, fname: Pstr) -> "Blob":
        is_secret = is_user_only(fname)
        with open(fname, "rb") as f:
            data = f.read()
        return cls(data, is_secret)

    def bytes(self) -> bytes:
        """Returns the data as a `bytes` object."""
        return self._data

    def __str__(self) -> str:
        if self.is_secret:
            return "*********"
        return self.bytes().decode("ascii")

    def write(
        self, path: Pstr, append: bool = False
    ) -> None:
        """Writes the data to the file at the given path.

        Args:
          path: The path to write to.
          append: If False (the default), replace any existing file
               with the given name. If True, append to any existing file.
        """
        p = Path(path)
        ctxt : AbstractContextManager[IO[Any]]
        if append:
            if self.is_secret:
                assert is_user_only(p)
            ctxt = p.open("ab")
        else:
            if self.is_secret:
                ctxt = new_file(p, "wb", 0o600)
            else:
                ctxt = new_file(p, "wb", 0o644)
                #ctxt = p.open("wb")

        with ctxt as f:
            f.write(self._data)

    @contextmanager
    def tempfile(self, dir: Optional[str] = None) -> Iterator[str]:
        """Context manager for writing data to a temporary file.

        The file is created when you enter the context manager, and
        automatically deleted when the context manager exits.

        Many libraries have annoying APIs which require that certificates be
        specified as filesystem paths, so even if you have already the data in
        memory, you have to write it out to disk and then let them read it
        back in again. If you encounter such a library, you should probably
        file a bug. But in the mean time, this context manager makes it easy
        to give them what they want.

        Example:

          Here's how to get requests to use a CA (`see also
          <http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification>`__)::

           ca = certified.CA()
           with ca.cert_pem.tempfile() as ca_cert_path:
               requests.get("https://localhost/...", verify=ca_cert_path)

        Args:
          dir: Passed to `tempfile.NamedTemporaryFile`.

        """
        with NamedTemporaryFile(suffix=".pem", dir=dir, delete=False) as f:
            try:
                os.chmod(f.name, 0o600)
                f.write(self._data)
                f.close()
                yield f.name
            finally:
                f.close() # in case chmod() or write() raised an error
                os.unlink(f.name)

bytes()

Returns the data as a bytes object.

Source code in certified/blob.py
94
95
96
def bytes(self) -> bytes:
    """Returns the data as a `bytes` object."""
    return self._data

tempfile(dir=None)

Context manager for writing data to a temporary file.

The file is created when you enter the context manager, and automatically deleted when the context manager exits.

Many libraries have annoying APIs which require that certificates be specified as filesystem paths, so even if you have already the data in memory, you have to write it out to disk and then let them read it back in again. If you encounter such a library, you should probably file a bug. But in the mean time, this context manager makes it easy to give them what they want.

Example:

Here's how to get requests to use a CA (see also <http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification>__)::

ca = certified.CA() with ca.cert_pem.tempfile() as ca_cert_path: requests.get("https://localhost/...", verify=ca_cert_path)

Parameters:

Name Type Description Default
dir Optional[str]

Passed to tempfile.NamedTemporaryFile.

None
Source code in certified/blob.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@contextmanager
def tempfile(self, dir: Optional[str] = None) -> Iterator[str]:
    """Context manager for writing data to a temporary file.

    The file is created when you enter the context manager, and
    automatically deleted when the context manager exits.

    Many libraries have annoying APIs which require that certificates be
    specified as filesystem paths, so even if you have already the data in
    memory, you have to write it out to disk and then let them read it
    back in again. If you encounter such a library, you should probably
    file a bug. But in the mean time, this context manager makes it easy
    to give them what they want.

    Example:

      Here's how to get requests to use a CA (`see also
      <http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification>`__)::

       ca = certified.CA()
       with ca.cert_pem.tempfile() as ca_cert_path:
           requests.get("https://localhost/...", verify=ca_cert_path)

    Args:
      dir: Passed to `tempfile.NamedTemporaryFile`.

    """
    with NamedTemporaryFile(suffix=".pem", dir=dir, delete=False) as f:
        try:
            os.chmod(f.name, 0o600)
            f.write(self._data)
            f.close()
            yield f.name
        finally:
            f.close() # in case chmod() or write() raised an error
            os.unlink(f.name)

write(path, append=False)

Writes the data to the file at the given path.

Parameters:

Name Type Description Default
path Pstr

The path to write to.

required
append bool

If False (the default), replace any existing file with the given name. If True, append to any existing file.

False
Source code in certified/blob.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def write(
    self, path: Pstr, append: bool = False
) -> None:
    """Writes the data to the file at the given path.

    Args:
      path: The path to write to.
      append: If False (the default), replace any existing file
           with the given name. If True, append to any existing file.
    """
    p = Path(path)
    ctxt : AbstractContextManager[IO[Any]]
    if append:
        if self.is_secret:
            assert is_user_only(p)
        ctxt = p.open("ab")
    else:
        if self.is_secret:
            ctxt = new_file(p, "wb", 0o600)
        else:
            ctxt = new_file(p, "wb", 0o644)
            #ctxt = p.open("wb")

    with ctxt as f:
        f.write(self._data)

PublicBlob

Bases: Blob

Source code in certified/blob.py
166
167
168
169
class PublicBlob(Blob):
    def __init__(self, cert : Union[x509.Certificate,
                                    x509.CertificateSigningRequest]) -> None:
        super().__init__(cert.public_bytes(Encoding.PEM), False)

PrivateBlob

Bases: Blob

Source code in certified/blob.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class PrivateBlob(Blob):
    def __init__(self, key : CertificateIssuerPrivateKeyTypes) -> None:
        try:
            pkey = key.private_bytes(
                Encoding.PEM,
                PrivateFormat.PKCS8,
                NoEncryption())
        except ValueError:
            pkey = key.private_bytes(Encoding.PEM,
                PrivateFormat.TraditionalOpenSSL,
                #PrivateFormat.PKCS8,
                #PrivateFormat.OpenSSH,
                NoEncryption())
        super().__init__(pkey, True)

is_user_only

Source code in certified/blob.py
45
46
47
def is_user_only(fname) -> bool:
    stat = os.stat(fname)
    return (stat.st_mode & 0o77) == 0

new_file

Fix the file open() API to create new files securely.

Ref: https://stackoverflow.com/questions/5624359/write-file-with-specific-permissions-in-python @Asclepius

Source code in certified/blob.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@contextmanager
def new_file(fname : Pstr, mode : str, perm : int,
             remove=False) -> Iterator[IO[Any]]:
    """ Fix the file open() API to create
        new files securely.

        Ref: https://stackoverflow.com/questions/5624359/write-file-with-specific-permissions-in-python @Asclepius
    """
    flags = os.O_RDWR | os.O_CREAT | os.O_EXCL

    if remove:
        try:
            os.remove(fname)
        except FileNotFoundError:
            pass

    # open fails if file exists
    fdesc = os.open(fname, flags, perm)
    with os.fdopen(fdesc, mode) as f:
        # this context closes fd on completion
        yield f