100 lines
3.0 KiB
Python
100 lines
3.0 KiB
Python
from collections import OrderedDict
|
|
from pathlib import Path
|
|
from types import ModuleType
|
|
from typing import Any, Dict, Iterator, List, Union
|
|
|
|
import pandas as pd
|
|
|
|
from .utils import sanitize_df
|
|
|
|
|
|
def get_columns_from_headers(backend: ModuleType, row: Any) -> List[str]:
|
|
repeat_until = -1
|
|
repeat_value = None
|
|
# columns as lists in a dictionary
|
|
columns = []
|
|
# parse the first row as column names
|
|
for k, cell in enumerate(row):
|
|
value, n_repeated = backend.get_value(cell)
|
|
if n_repeated > 0:
|
|
repeat_value = value
|
|
repeat_until = n_repeated + k
|
|
if not value and k <= repeat_until:
|
|
value = repeat_value
|
|
if k == repeat_until:
|
|
# reset to allow for more than one repeated column
|
|
repeat_until = -1
|
|
if value and value not in columns:
|
|
columns.append(value)
|
|
else:
|
|
column_name = value if value else "unnamed"
|
|
# add count to column name
|
|
idx = 1
|
|
while f"{column_name}.{idx}" in columns:
|
|
idx += 1
|
|
columns.append(f"{column_name}.{idx}")
|
|
return columns
|
|
|
|
|
|
def get_generic_columns(row: Any) -> List[str]:
|
|
return [f"column.{j}" for j in range(len(row))]
|
|
|
|
|
|
def get_columns(backend: ModuleType, row: Any, headers: bool) -> List[str]:
|
|
if headers:
|
|
return get_columns_from_headers(backend, row)
|
|
return get_generic_columns(row)
|
|
|
|
|
|
def parse_data(
|
|
backend: ModuleType,
|
|
rows: Iterator[List[Any]],
|
|
headers: bool,
|
|
columns: List[str],
|
|
skiprows: int,
|
|
) -> pd.DataFrame:
|
|
df_dict: OrderedDict[str, Any] = OrderedDict()
|
|
col_index: Dict[int, str] = {}
|
|
|
|
for _ in range(skiprows):
|
|
next(rows)
|
|
|
|
for i, row in enumerate(rows):
|
|
# row is a list of cells
|
|
if i == 0:
|
|
columns = columns or get_columns(backend, row, headers)
|
|
df_dict = OrderedDict((column, []) for column in columns)
|
|
# create index for the column headers
|
|
col_index = {j: column for j, column in enumerate(columns)}
|
|
if headers:
|
|
continue
|
|
|
|
for j, cell in enumerate(row):
|
|
if j < len(col_index):
|
|
value, _ = backend.get_value(cell, parsed=True)
|
|
# use header instead of column index
|
|
df_dict[col_index[j]].append(value)
|
|
|
|
# make sure all columns are of the same length
|
|
max_col_length = max(len(df_dict[col]) for col in df_dict)
|
|
for col in df_dict:
|
|
col_length = len(df_dict[col])
|
|
if col_length < max_col_length:
|
|
df_dict[col] += [None] * (max_col_length - col_length)
|
|
|
|
return pd.DataFrame(df_dict)
|
|
|
|
|
|
def read_data(
|
|
backend: ModuleType,
|
|
file_or_path: Path,
|
|
sheet_id: Union[str, int],
|
|
headers: bool,
|
|
columns: List[str],
|
|
skiprows: int,
|
|
) -> pd.DataFrame:
|
|
doc = backend.get_doc(file_or_path)
|
|
rows = backend.get_rows(doc, sheet_id)
|
|
df = parse_data(backend, rows, headers=headers, columns=columns, skiprows=skiprows)
|
|
return sanitize_df(df)
|