"""
Write output files.
"""
import difflib
import os
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple
import numpy as np
import pandas as pd
import pulp
from scipy import stats as s
from sklearn.preprocessing import MinMaxScaler
import wip.modules.ops as operations
from wip.constants import constants
from wip.datatools.io_ops import read_csv
from wip.datatools.io_ops import read_json
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import to_excel
from wip.files.depara_pisystem import pims_to_pisystem_dict
from wip.logging_config import log
from wip.modules import solver_ops
from wip.utils import exists
[docs]def write_objective_function_coef(solver_path, scalers):
real_cost = read_json(f"{solver_path}/custo_real.json")
tokens = solver_ops.define_range_constraints("energia_moinho", 1, 2)
real_cost = solver_ops.adjust_real_cost(real_cost, tokens)
tokens = solver_ops.define_range_constraints("corpo_moedor_especifico_{}", 1, 4)
real_cost = solver_ops.adjust_real_cost(real_cost, tokens, 1, 1)
# opt_costs = solver_ops.scale_optimization_tags(scalers, real_cost)
obj_coefs = pd.DataFrame.from_dict(real_cost, orient='index', columns=['Custo'])
obj_coefs.index.name = "TAG"
obj_coefs = obj_coefs.loc[constants.OBJ_FUNC_COEF]
to_csv(
obj_coefs,
os.path.join(solver_path, "costs.csv"),
decimal=',',
sep=';',
index=True,
)
return obj_coefs
[docs]def rot_pot_filtragem(solver_path, scalers, datasets):
custos_reais = (
read_csv(
os.path.join(solver_path, "costs.csv"),
decimal=",",
sep=";",
index_col="TAG",
)
.to_dict()
.get("Custo")
)
# custos_reais = solver_ops.unnormalize_optimization_tags(
# scalers, custos_reais)
results_otm = []
inv_tag_2_var = {v: k for k, v in constants.TARGETS_IN_MODEL.items()}
columns = [
"faixa",
"TAG",
"minimo",
"maximo",
"valor normalizado",
"valor real",
"custo",
]
df_output = pd.DataFrame(columns=columns)
rot_func = {
"ROTA1_I@08FI-FL-827I-01M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-02M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-03M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-04M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-05RM1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-06M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-07M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-08M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-09M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"ROTA1_I@08FI-FL-827I-10RM1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
}
pot_func = {
"POTE1_I@08FI-BV-827I-01M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-02M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-03M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-04M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-05RM1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-06M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-07M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-08M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-09M1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
"POTE1_I@08FI-BV-827I-10RM1": {
"750": [],
"800": [],
"850": [],
"900": [],
"950": [],
"1000": [],
},
}
# INICIO: pos-processamento para garantir que o ROTA1_I@08FI-FL-827I seja igual ao FUNC * max_rot
for fmin, fmax in constants.production_range:
f = f"Variables - VarX_{fmin}-{fmax}.csv"
if not exists(os.path.join(solver_path, f)):
continue
df = read_csv(os.path.join(solver_path, f), sep=";", encoding="iso-8859-1")
for line in df.iterrows():
var_csv = line[1][" VariableName"]
var = var_csv
coef_real = coef = line[1][" Value"]
obj_coef_readl = obj_coef = line[1][" ObjCoeff"]
if "ROTA1_I@08FI-FL-827I" not in var and "POTE1_I@08FI-BV-827I" not in var:
continue
scaler = scalers.get(var)
if scaler is not None:
coef_real = scaler.inverse_transform([[coef]])[0][0]
if "ROTA1_I@08FI-FL-827I" in var:
rot_func[var][str(fmax)] = coef_real
if "POTE1_I@08FI-BV-827I" in var:
pot_func[var][str(fmax)] = coef_real
return rot_func, pot_func
[docs]def increasing_temp_gq(solver_path, scalers, datasets):
custos_reais = (
read_csv(
os.path.join(solver_path, "costs.csv"),
decimal=',',
sep=';',
index_col='TAG',
)
.to_dict()
.get('Custo')
)
# custos_reais = solver_ops.unnormalize_optimization_tags(
# scalers, custos_reais)
results_otm = []
inv_tag_2_var = {v: k for k, v in constants.TARGETS_IN_MODEL.items()}
columns = [
'faixa',
'TAG',
'minimo',
'maximo',
'valor normalizado',
'valor real',
'custo',
]
df_output = pd.DataFrame(columns=columns)
rot_func = {
'TEMP1_I@08QU-QU-855I-GQ03': [],
'TEMP1_I@08QU-QU-855I-GQ04': [],
'TEMP1_I@08QU-QU-855I-GQ05': [],
'TEMP1_I@08QU-QU-855I-GQ06': [],
'TEMP1_I@08QU-QU-855I-GQ07': [],
'TEMP1_I@08QU-QU-855I-GQ08': [],
'TEMP1_I@08QU-QU-855I-GQ09': [],
'TEMP1_I@08QU-QU-855I-GQ10': [],
'TEMP1_I@08QU-QU-855I-GQ11': [],
'TEMP1_I@08QU-QU-855I-GQ12': [],
'TEMP1_I@08QU-QU-855I-GQ13': [],
'TEMP1_I@08QU-QU-855I-GQ14': [],
'TEMP1_I@08QU-QU-855I-GQ15': [],
'TEMP1_I@08QU-QU-855I-GQ16': [],
}
# INICIO: pos-processamento para garantir que o ROTA1_I@08FI-FL-827I seja igual ao FUNC * max_rot
for fmin, fmax in constants.production_range:
f = f'Variables - VarX_{fmin}-{fmax}.csv'
if not exists(os.path.join(solver_path, f)):
continue
df = read_csv(os.path.join(solver_path, f), sep=';', encoding='iso-8859-1')
for line in df.loc[df[' VariableName'].isin(list(rot_func.keys())), :].sort_values(' VariableName').iterrows():
var_csv = line[1][' VariableName']
var = var_csv
coef_real = coef = line[1][' Value']
obj_coef_readl = obj_coef = line[1][' ObjCoeff']
if (
var
in [
'TEMP1_I@08QU-QU-855I-GQ01',
'TEMP1_I@08QU-QU-855I-GQ02',
'TEMP1_I@08QU-QU-855I-GQA',
'TEMP1_I@08QU-QU-855I-GQB',
'TEMP1_I@08QU-QU-855I-GQC',
]
or 'TEMP1_I@08QU-QU-855I-GQ' not in var
):
continue
obj_coef_real = operations.unnormalize_feature(
scalers, var, obj_coef, operation='one feature'
)
coef_real = operations.unnormalize_feature(
scalers, var, coef, operation='one feature'
)
if 'TEMP1_I@08QU-QU-855I-GQ' in var:
rot_func[var].append(coef_real)
# FIM: pos-processamento para garantir que o ROTA1_I@08FI-FL-827I seja igual ao FUNC * max_rot
return rot_func
[docs]def define_optimization_results(
solver_path: str,
scalers: Dict[str, MinMaxScaler],
datasets: Dict[str, pd.DataFrame],
solver,
):
custos_reais = (
read_csv(
os.path.join(solver_path, "costs.csv"),
decimal=",",
sep=";",
index_col="TAG",
)
.to_dict()
.get("Custo")
)
# custos_reais = solver_ops.unnormalize_optimization_tags(scalers, custos_reais)
results_otm = []
inv_tag_2_var = {v: k for k, v in constants.TARGETS_IN_MODEL.items()}
columns = [
"faixa",
"TAG",
"minimo",
"maximo",
"valor normalizado",
"valor real",
"custo",
]
df_output = pd.DataFrame(columns=columns)
# dens, gran_ocs, rota_fi = tags_equality(solver_path, scalers, datasets)
temp_gq = increasing_temp_gq(solver_path, scalers, datasets)
# temp_gq = {key: sorted(value) for key, value in temp_gq.items()}
production_range = constants.production_range
prod_range = {str(lmax): idx for idx, (_, lmax) in enumerate(production_range)}
for fmin, fmax in production_range:
try:
# rota_fi[str(fmax)] = np.array(rota_fi[str(fmax)])
results_otm = []
filename = f"Variables - VarX_{fmin}-{fmax}.csv"
optimization_results_fpath = os.path.join(solver_path, filename)
if not exists(optimization_results_fpath):
log.warning(
"No results found for production range %s-%s. "
"Filepath: %s. Skipping it.",
fmin,
fmax,
f"{optimization_results_fpath!r}",
)
continue
df_otimizacao = read_csv(
optimization_results_fpath, sep=";", encoding="iso-8859-1"
)
custo_final = 0
tag_to_fix = ["SE PP", "SUP_SE_PP_L@08PR", "SUP_SE_PR_L@08FI"]
tag_integer = [
"SOMA FUNC FILTROS",
"qtde_discos",
"qtde_moinhos",
"qtde_filtros",
]
for line in df_otimizacao.iterrows():
var_csv = line[1][" VariableName"]
var = var_csv
coef_real = coef = line[1][" Value"]
obj_coef_readl = obj_coef = line[1][" ObjCoeff"]
lb = line[1][" LB"]
ub = line[1][" UB"]
svar = var
if var in [
"NIVE1_C@08QU-FR-851I-01M1",
"NIVE2_I@08QU-FR-851I-01M1",
"NIVE3_I@08QU-FR-851I-01M1",
"NIVE4_I@08QU-FR-851I-01M1",
"NIVE5_I@08QU-FR-851I-01M1",
"NIVE6_I@08QU-FR-851I-01M1",
]:
svar = "nive"
if svar in scalers:
if (
svar not in constants.TARGETS_IN_MODEL.keys()
and svar not in constants.TARGETS_IN_MODEL.values()
and svar not in tag_to_fix
and svar not in tag_integer
):
if not svar.startswith("FUNC"):
coef_real = operations.unnormalize_feature(
scalers, svar, coef, operation="one feature"
)
obj_coef_real = operations.unnormalize_feature(
scalers, svar, obj_coef, operation="one feature"
)
ub = operations.unnormalize_feature(
scalers, svar, ub, operation="one feature"
)
lb = operations.unnormalize_feature(
scalers, svar, lb, operation="one feature"
)
if np.isnan(coef_real):
log.info('%s, " contains NaN ", %s', svar, coef_real)
coef_real = 0
elif (
svar in constants.TARGETS_IN_MODEL.values()
or svar in constants.TARGETS_IN_MODEL
or svar in tag_to_fix
or svar in tag_integer
):
try:
coef = operations.normalize_feature(
scalers, inv_tag_2_var.get(svar), coef_real
)
except Exception as exc: # pylint: disable=broad-except
# log.exception(exc)
# log.error("Could not normalize feature %s", svar)
if svar in [
"Calculo da Energia da Filtragem",
"SUP_SE_PP_L@08PR",
"SUP_SE_PR_L@08FI",
"COMP_MCOMP_PQ_L@08QU",
]:
ub = operations.unnormalize_feature(
scalers, svar, ub, operation="one feature"
)
lb = operations.unnormalize_feature(
scalers, svar, lb, operation="one feature"
)
coef = coef_real
coef_real = operations.unnormalize_feature(
scalers, svar, coef_real, operation="one feature"
)
else:
coef = operations.normalize_feature(
scalers, svar, coef_real
)
custo_final += coef_real * custos_reais.get(var, 0)
if "DENS1_C@08HO-BP-826I-" in svar:
coef_real = scalers[svar].inverse_transform([[coef_real]])[0][0]
# coef_real = dens
if "GRAN_OCS_TM@08PE-BD-840I" in svar:
coef_real = scalers[svar].inverse_transform([[coef_real]])[0][0]
# coef_real = gran_ocs[int(svar[-2:]) - 1]
# if "ROTA1_I@08FI-FL-827I-" in svar:
# rot = rota_fi[str(fmax)].nonzero()
# coef_real = rota_fi[str(fmax)][rot].mean() * rot_func[svar][str(fmax)]
# coef_real = scalers[svar].inverse_transform([[coef_real]])[0][0]
# if "POTE1_I@08FI-BV-827I-" in svar:
# coef_real *= pot_func[svar][str(fmax)]
if (
"TEMP1_I@08QU-QU-855I-GQ" in svar
and svar
not in [
"TEMP1_I@08QU-QU-855I-GQ01",
"TEMP1_I@08QU-QU-855I-GQA",
"TEMP1_I@08QU-QU-855I-GQB",
"TEMP1_I@08QU-QU-855I-GQC",
]
and svar in temp_gq
):
coef_real = temp_gq[svar][prod_range[str(fmax)]]
results_otm.append(
[
f"{fmin}-{fmax}",
var,
lb,
ub,
coef,
coef_real,
(custos_reais.get(var, 0)),
]
)
df = pd.DataFrame(results_otm, columns=columns)
df_output = pd.concat([df_output, df])
log.info(
"Faixa de produção: %s-%s - Custo: R$ %s",
fmin,
fmax,
f"{custo_final:.2f}",
)
except Exception as exc: # pylint: disable=broad-except
log.exception(exc)
log.error("Failed to save results for production range %s-%s", fmin, fmax)
try:
solver_ops.save_solver_results(solver_path, df_output)
except ValueError as exc:
log.exception(exc)
log.error("Could not save post-otm results.")
return df_output
class LpScaledConstraint:
"""
Class representing a scaled linear programming constraint.
This class is used to manage a set of constraints in a linear
programming problem, with methods for creating, updating and working
with these constraints.
Attributes
----------
prob : pulp.LpProblem
The linear programming problem to which constraints are added.
scalers : Dict[str, MinMaxScaler]
Dictionary mapping column names to MinMaxScaler instances.
datasets : Dict[str, pd.DataFrame]
Dictionary mapping column names to corresponding pandas DataFrames.
Examples
--------
After instantiating the `LpScaledConstraint` class, you can
generate a pandas `DataFrame` with the unscaled constraints,
using the method `create_constraints`:
>>> # noinspection PyUnresolvedReferences
>>> lp_scaled_constraint = LpScaledConstraint(prob, scalers, datasets)
>>> lp_scaled_constraint_df = lp_scaled_constraint.create_constraints()
See Also
--------
pulp.LpProblem : A class representing a linear programming problem.
sklearn.preprocessing.MinMaxScaler : Class used for scaling features.
pandas.DataFrame : Class representing a two-dimensional table of data.
.. versionchanged:: 0.2.0
* Fix the unscaling process that was being executed for all constraints,
instead of only being executed for the ones that needed to be unscaled.
"""
@property
def model_names(self) -> List[str]:
"""List of model names, with " " replaced by "_".
Returns
-------
List[str]
A list of model names, with any spaces replaced by underscores.
For example, "Model 1" becomes "Model_1".
"""
return [name.replace(' ', '_') for name in self.datasets.keys()]
@property
def target_columns(self) -> Dict[str, str]:
"""
Dictionary mapping LP variable names to each model target name.
Variables that share the same name as the regression models represent that model's
target variable. This property returns the mapping between model names and target column
names.
Returns
-------
Dict[str, str]
A dictionary mapping original model names to their
corresponding target names.
"""
return {
key.replace(' ', '_'): df.columns[-1] for key, df in self.datasets.items()
}
def __init__(
self,
prob: pulp.LpProblem,
scalers: Dict[str, MinMaxScaler],
datasets: Dict[str, pd.DataFrame],
):
"""
Initialize the `LpScaledConstraint` with a problem, scalers, and datasets.
The `scalers` and `datasets` are needed to unscale some of the constraints,
that are created using normalized values.
Parameters
----------
prob : pulp.LpProblem
The linear programming problem to which constraints are added.
scalers : Dict[str, MinMaxScaler]
Dictionary mapping column names to `MinMaxScaler` instances.
datasets : Dict[str, pd.DataFrame]
Dictionary mapping column names to corresponding pandas `DataFrames`.
"""
self.prob = prob
self.scalers = scalers
self.datasets = datasets
def create_constraints(self) -> pd.DataFrame:
"""Create a DataFrame of constraints in the linear programming problem.
Returns
-------
pd.DataFrame
A `pandas.DataFrame` containing the constraints of the
linear programming problem. Each row represents a constraint with
its name, value, and equation.
"""
constraints = []
for cname, c in self.prob.constraints.items():
unscale = cname in self.model_names
value, eq = self.create_single_constraint(c, unscale=unscale)
constraints.append([cname, value, eq])
return pd.DataFrame(constraints, columns=['Name', 'Value', 'Equation'])
def update_variables(
self, coeff: float, xvar: pulp.LpVariable
) -> Tuple[float, float, float]:
"""Update the coefficient and variable value of a constraint.
Parameters
----------
coeff : float
The coefficient of the variable in the constraint.
xvar : pulp.LpVariable
The variable in the constraint.
Returns
-------
Tuple[float, float, float]
A tuple containing the updated coefficient, variable value,
and the subtracted intercept.
"""
name = replace_symbols(xvar.name)
name = self.target_columns.get(name, name)
matches = difflib.get_close_matches(name, self.scalers.keys(), n=1)
xval = xvar.varValue
intercept_subtractor = 0
if matches:
coeff, intercept_subtractor = operations.unnormalize_feature(
self.scalers, matches[0], coeff
)
xval = self.scalers[matches[0]].inverse_transform([[xval]])[0][0]
return coeff, xval, intercept_subtractor
def create_single_constraint(
self, c: pulp.LpConstraint, unscale: bool = False
) -> Tuple[float, str]:
"""
Create a single constraint from a linear programming problem.
Parameters
----------
c : pulp.LpConstraint
The constraint from the linear programming problem.
unscale : bool, default=False
Whether to unscale the constraint coefficients and variable values.
Returns
-------
Tuple[float, str]
A tuple containing the result of the constraint equation
and the equation itself as a string.
"""
res = 0
intercept = c.constant
eq = ""
for lp_coeff, lp_var in zip(c.values(), c):
_lp_var = lp_var.varValue
if unscale:
lp_coeff, _lp_var, intercept_subtractor = self.update_variables(
lp_coeff, lp_var
)
intercept -= intercept_subtractor
eq += f'{lp_coeff} * {lp_var.name} + '
res += _lp_var * lp_coeff
eq += f'{intercept} {pulp.LpConstraintSenses[c.sense]} 0'
res += intercept
return res, eq.replace('-', '- ').replace('+ -', '-')
[docs]def replace_symbols(name: str) -> str:
"""Replace certain words in a string with mathematical symbols.
This function takes a string and replaces occurrences of the words 'equal',
'div', and 'mult' with the mathematical symbols '=', '/', and '*',
respectively.
Parameters
----------
name : str
Input string which may contain the words 'equal', 'div', and 'mult' that need
to be replaced with their corresponding mathematical symbols.
Returns
-------
str
Output string after replacing the words with the corresponding
mathematical symbols.
Examples
--------
>>> replace_symbols("x mult y div z equal 10")
'x * y / z = 10'
"""
return name.replace('equal', '=').replace('div', '/').replace('mult', '*')
[docs]def lp_variables_to_excel(
lp_problem: pulp.LpProblem, output_path: str, format: str = "xlsx"
):
"""Save the LP variables to an Excel file."""
lp_results = []
for name, lp_var in lp_problem.variablesDict().items():
lb = lp_var.lowBound
ub = lp_var.upBound
value = lp_var.varValue
cat = lp_var.cat
pisystem_name = get_pisystem_tag_name(name)
lp_results.append([name, value, lb, ub, cat, pisystem_name])
results = pd.DataFrame(
lp_results,
columns=[
"Nome Variável",
"Valor",
"Limite Inferior",
"Limite Superior",
"Tipo de Variável",
"PI System Tag Name",
],
)
filename = Path(output_path).name
output_path = output_path.replace(
str(filename), str(Path(filename).with_suffix(f".{format}"))
)
if format == "xlsx":
to_excel(results, output_path, index=False)
elif format == "csv":
to_csv(results, output_path, index=False)
[docs]def get_pisystem_tag_name(pims_tag_name: str) -> str:
"""Get the PI System tag name from the PIMS tag name.
This function starts by cleaning the `pims_tag_name` string. The cleaning
process performs the following operations:
1. If the `pims_tag_name` has the "@" symbol, then the string is split
into two parts: the first part is the prefix and the second part is the
rest of the string. Then all '_' are replaced with '-' for the second part
of the string. Finally, the prefix and the second part of the string are
rejoined together with the '@' symbol between them.
2. All '___' are replaced with ' - '.
3. All '__' are replaced with '_'.
4. Function removes leading '_'.
After the cleaning process, the function checks if the `pims_tag_name` is
in the `pims_to_pisystem_dict` dictionary. If it's, then the corresponding
PI System tag name is returned. If it's not, then the function tries
to find the key in the `pims_to_pisystem_dict` dictionary that's the most
similar to the `pims_tag_name`. If a similar key is found, then the
corresponding PI System tag name is returned. If a similar key isn't found,
then the function logs an error message and returns the cleaned tag name.
Parameters
----------
pims_tag_name : str
Tag name in the PIMS format.
Returns
-------
str
PI System tag name or clean tag name if the PI System tag name
isn't found in the `pims_to_pisystem_dict` dictionary.
Notes
-----
This function is intended to be used to convert the variables' names
post-optimization, to include them in the final results file.
"""
_pims_tag_name = pims_tag_name
# Underscores ('_') after the '@' symbol are actually dashes ('-').
# PuLP doesn't allow the use of '-' inside variable names, therefore,
# during the optimization process, all '-' were replaced with '_'.
# We only replace the underscores with '-' on the part of the name
# after the '@' symbol, because the prefix of the name actually uses '_'
# as a separator, and therefore, we don't want to replace those underscores.
if '@' in pims_tag_name:
prefix, rest = pims_tag_name.split('@', 1)
rest = rest.replace('_', '-')
pims_tag_name = f'{prefix}@{rest}'
# Function `replace_symbols` replaces math symbols
# converted to words backs to their original symbols.
# For example, this function transforms a name like 'equal192divVELO'
# back to '=192/VELO'.
# This is necessary because, due to a PuLP limitation on the characters
# that can be used in variable names, these names were converted from math
# symbols to text, to conform to the PuLP restrictions.
# As this function is mainly intended to be used to generate the optimization
# results file, it's necessary to include it here.
pims_tag_name = replace_symbols(pims_tag_name).replace('___', ' - ')
while '__' in pims_tag_name:
pims_tag_name = pims_tag_name.replace('__', '_')
while pims_tag_name.startswith('_'):
pims_tag_name = pims_tag_name[1:]
if pims_tag_name in pims_to_pisystem_dict.keys():
return pims_to_pisystem_dict[pims_tag_name]
if _pims_tag_name in pims_to_pisystem_dict.keys():
return pims_to_pisystem_dict[_pims_tag_name]
pims_real_name = difflib.get_close_matches(
pims_tag_name, pims_to_pisystem_dict.keys(), n=1, cutoff=0.6
)
if pims_real_name:
return pims_to_pisystem_dict[pims_real_name[0]]
return pims_tag_name