Skip to content

sqlite

SQLiteReporter

Bases: BenchmarkReporter

Source code in src/nnbench/reporter/sqlite.py
class SQLiteReporter(BenchmarkReporter):
    @staticmethod
    def strip_protocol(uri: str | os.PathLike[str]) -> str:
        s = str(uri)
        if s.startswith("sqlite://"):
            return s[9:]
        return s

    def read(self, uri: str | os.PathLike[str], options: dict[str, Any]) -> BenchmarkResult:
        uri = self.strip_protocol(uri)
        query: str | None = options.pop("query", _DEFAULT_READ_QUERY)
        if query is None:
            raise ValueError(f"need a query to read from SQLite Database {uri!r}")

        db = f"file:{uri}?mode=ro"  # open DB in read-only mode
        conn = sqlite3.connect(db, uri=True)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        cursor.execute(query)
        records = [dict(r) for r in cursor.fetchall()]
        conn.close()
        # TODO: Partition list on run ID, make record for each of them
        return BenchmarkResult.from_records(records)

    def write(
        self, result: BenchmarkResult, uri: str | os.PathLike[str], options: dict[str, Any]
    ) -> None:
        uri = self.strip_protocol(uri)
        query: str | None = options.pop("query", _DEFAULT_INSERT_QUERY)
        if query is None:
            raise ValueError(f"need a query to write to SQLite Database {uri!r}")

        conn = sqlite3.connect(uri)
        cursor = conn.cursor()

        # TODO: Guard by exists_ok state
        cursor.execute(_DEFAULT_CREATION_QUERY)

        records = result.to_records()
        cursor.executemany(query, records)
        conn.commit()

strip_protocol staticmethod

strip_protocol(uri: str | PathLike[str]) -> str
Source code in src/nnbench/reporter/sqlite.py
@staticmethod
def strip_protocol(uri: str | os.PathLike[str]) -> str:
    s = str(uri)
    if s.startswith("sqlite://"):
        return s[9:]
    return s

read

read(uri: str | PathLike[str], options: dict[str, Any]) -> BenchmarkResult
Source code in src/nnbench/reporter/sqlite.py
def read(self, uri: str | os.PathLike[str], options: dict[str, Any]) -> BenchmarkResult:
    uri = self.strip_protocol(uri)
    query: str | None = options.pop("query", _DEFAULT_READ_QUERY)
    if query is None:
        raise ValueError(f"need a query to read from SQLite Database {uri!r}")

    db = f"file:{uri}?mode=ro"  # open DB in read-only mode
    conn = sqlite3.connect(db, uri=True)
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    cursor.execute(query)
    records = [dict(r) for r in cursor.fetchall()]
    conn.close()
    # TODO: Partition list on run ID, make record for each of them
    return BenchmarkResult.from_records(records)

write

write(result: BenchmarkResult, uri: str | PathLike[str], options: dict[str, Any]) -> None
Source code in src/nnbench/reporter/sqlite.py
def write(
    self, result: BenchmarkResult, uri: str | os.PathLike[str], options: dict[str, Any]
) -> None:
    uri = self.strip_protocol(uri)
    query: str | None = options.pop("query", _DEFAULT_INSERT_QUERY)
    if query is None:
        raise ValueError(f"need a query to write to SQLite Database {uri!r}")

    conn = sqlite3.connect(uri)
    cursor = conn.cursor()

    # TODO: Guard by exists_ok state
    cursor.execute(_DEFAULT_CREATION_QUERY)

    records = result.to_records()
    cursor.executemany(query, records)
    conn.commit()