Add more type hints.
This commit is contained in:
parent
8d0429946d
commit
31165cb6fb
|
|
@ -1,11 +1,14 @@
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from pathlib import Path
|
||||||
|
from types import ModuleType
|
||||||
|
from typing import Any, Iterator, Union
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
from .utils import sanitize_df
|
from .utils import sanitize_df
|
||||||
|
|
||||||
|
|
||||||
def get_columns_from_headers(backend, row):
|
def get_columns_from_headers(backend: ModuleType, row: Any) -> list[str]:
|
||||||
repeat_until = -1
|
repeat_until = -1
|
||||||
repeat_value = None
|
repeat_value = None
|
||||||
# columns as lists in a dictionary
|
# columns as lists in a dictionary
|
||||||
|
|
@ -33,21 +36,29 @@ def get_columns_from_headers(backend, row):
|
||||||
return columns
|
return columns
|
||||||
|
|
||||||
|
|
||||||
def get_generic_columns(row):
|
def get_generic_columns(row: Any) -> list[str]:
|
||||||
return [f"column.{j}" for j in range(len(row))]
|
return [f"column.{j}" for j in range(len(row))]
|
||||||
|
|
||||||
|
|
||||||
def get_columns(backend, row, headers):
|
def get_columns(backend: ModuleType, row: Any, headers: bool) -> list[str]:
|
||||||
if headers:
|
if headers:
|
||||||
return get_columns_from_headers(backend, row)
|
return get_columns_from_headers(backend, row)
|
||||||
return get_generic_columns(row)
|
return get_generic_columns(row)
|
||||||
|
|
||||||
|
|
||||||
def parse_data(backend, rows, headers=True, columns=None, skiprows=0):
|
def parse_data(
|
||||||
df_dict = OrderedDict()
|
backend: ModuleType,
|
||||||
col_index = {}
|
rows: Iterator[list[Any]],
|
||||||
|
headers: bool,
|
||||||
|
columns: list[str],
|
||||||
|
skiprows: int,
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
df_dict: OrderedDict[str, Any] = OrderedDict()
|
||||||
|
col_index: dict[int, str] = {}
|
||||||
|
|
||||||
for _ in range(skiprows):
|
for _ in range(skiprows):
|
||||||
next(rows)
|
next(rows)
|
||||||
|
|
||||||
for i, row in enumerate(rows):
|
for i, row in enumerate(rows):
|
||||||
# row is a list of cells
|
# row is a list of cells
|
||||||
if i == 0:
|
if i == 0:
|
||||||
|
|
@ -74,7 +85,14 @@ def parse_data(backend, rows, headers=True, columns=None, skiprows=0):
|
||||||
return pd.DataFrame(df_dict)
|
return pd.DataFrame(df_dict)
|
||||||
|
|
||||||
|
|
||||||
def read_data(backend, file_or_path, sheet_id, headers=True, columns=None, skiprows=0):
|
def read_data(
|
||||||
|
backend: ModuleType,
|
||||||
|
file_or_path: Path,
|
||||||
|
sheet_id: Union[str, int],
|
||||||
|
headers: bool,
|
||||||
|
columns: list[str],
|
||||||
|
skiprows: int,
|
||||||
|
) -> pd.DataFrame:
|
||||||
doc = backend.get_doc(file_or_path)
|
doc = backend.get_doc(file_or_path)
|
||||||
rows = backend.get_rows(doc, sheet_id)
|
rows = backend.get_rows(doc, sheet_id)
|
||||||
df = parse_data(backend, rows, headers=headers, columns=columns, skiprows=skiprows)
|
df = parse_data(backend, rows, headers=headers, columns=columns, skiprows=skiprows)
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from lxml import etree
|
from lxml import etree
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -13,7 +15,7 @@ TABLE_CELL_REPEATED_ATTRIB = "number-columns-repeated"
|
||||||
VALUE_TYPE_ATTRIB = "value-type"
|
VALUE_TYPE_ATTRIB = "value-type"
|
||||||
|
|
||||||
|
|
||||||
def get_doc(file_or_path):
|
def get_doc(file_or_path: Path):
|
||||||
return etree.parse(str(file_or_path))
|
return etree.parse(str(file_or_path))
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -21,7 +23,8 @@ def get_sheet(spreadsheet, sheet_id):
|
||||||
namespaces = spreadsheet.nsmap
|
namespaces = spreadsheet.nsmap
|
||||||
if isinstance(sheet_id, str):
|
if isinstance(sheet_id, str):
|
||||||
sheet = spreadsheet.find(
|
sheet = spreadsheet.find(
|
||||||
f"{TABLE_TAG}[@table:name='{sheet_id}']", namespaces=namespaces
|
f"{TABLE_TAG}[@table:name='{sheet_id}']",
|
||||||
|
namespaces=namespaces,
|
||||||
)
|
)
|
||||||
if sheet is None:
|
if sheet is None:
|
||||||
raise KeyError(f"There is no sheet named {sheet_id}.")
|
raise KeyError(f"There is no sheet named {sheet_id}.")
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,28 @@
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Iterator, Union
|
||||||
|
|
||||||
import ezodf
|
import ezodf
|
||||||
|
from ezodf.document import FlatXMLDocument, PackagedDocument
|
||||||
|
|
||||||
|
|
||||||
def get_doc(file_or_path):
|
def get_doc(file_or_path: Path) -> Union[FlatXMLDocument, PackagedDocument]:
|
||||||
return ezodf.opendoc(file_or_path)
|
return ezodf.opendoc(file_or_path)
|
||||||
|
|
||||||
|
|
||||||
def get_rows(doc, sheet_id):
|
def get_rows(
|
||||||
|
doc: Union[FlatXMLDocument, PackagedDocument],
|
||||||
|
sheet_id: Union[str, int],
|
||||||
|
) -> Iterator[list[ezodf.Cell]]:
|
||||||
if not isinstance(sheet_id, (int, str)):
|
if not isinstance(sheet_id, (int, str)):
|
||||||
raise ValueError("Sheet id has to be either `str` or `int`")
|
raise ValueError("Sheet id has to be either `str` or `int`")
|
||||||
if isinstance(sheet_id, str):
|
if isinstance(sheet_id, str):
|
||||||
sheets = [sheet.name for sheet in doc.sheets]
|
sheets: list[str] = [sheet.name for sheet in doc.sheets]
|
||||||
if sheet_id not in sheets:
|
if sheet_id not in sheets:
|
||||||
raise KeyError("There is no sheet named {}".format(sheet_id))
|
raise KeyError("There is no sheet named {}".format(sheet_id))
|
||||||
sheet_id = sheets.index(sheet_id) + 1
|
sheet_id = sheets.index(sheet_id) + 1
|
||||||
sheet = doc.sheets[sheet_id - 1]
|
sheet: ezodf.Sheet = doc.sheets[sheet_id - 1]
|
||||||
return sheet.rows()
|
return sheet.rows()
|
||||||
|
|
||||||
|
|
||||||
def get_value(cell, parsed=False):
|
def get_value(cell: ezodf.Cell, parsed: bool = False) -> tuple[Any, int]:
|
||||||
return cell.value, 0
|
return cell.value, 0
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
"""Provides utility functions for the parser"""
|
"""Provides utility functions for the parser"""
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
def ods_info(doc):
|
def ods_info(doc):
|
||||||
|
|
@ -14,8 +15,8 @@ def ods_info(doc):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def sanitize_df(df):
|
def sanitize_df(df: pd.DataFrame) -> pd.DataFrame:
|
||||||
"""Drop empty rows and columns from the DataFrame and returns it"""
|
"""Drop empty rows and columns from the DataFrame and return it."""
|
||||||
# Delete empty rows
|
# Delete empty rows
|
||||||
for i in df.index.tolist()[-1::-1]:
|
for i in df.index.tolist()[-1::-1]:
|
||||||
if df.iloc[i].isna().all():
|
if df.iloc[i].isna().all():
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue