Source code for rivus.tests.test_db

import unittest
import pyomo.environ  # although is not used direktly, is needee by pyomo
from pyomo.opt.base import SolverFactory
from rivus.main.rivus import read_excel, create_model
from rivus.main.rivus import get_timeseries, get_constants
from rivus.utils.prerun import setup_solver
from rivus.gridder.create_grid import create_square_grid as square_grid
from rivus.gridder.extend_grid import vert_init_commodities
from rivus.gridder.extend_grid import extend_edge_data
from rivus.io import db as rdb
from sqlalchemy import create_engine
import json
import os
pdir = os.path.dirname


[docs]class RivusDBTest(unittest.TestCase):
[docs] def test_df_insert_query(self): """Are the stored dataframes and the retrieved ones identical? - Comparison form of frames is *after* create_model. (index is set) - Comparison form expects that input dataframes only have meaningful columns. (See pull request #23) - Only implemented dataframes are tested. Note ---- Requires a ``config.json`` file in the root of rivus-repo with the database credentials. For Example: :: { "db" : { "user" : "postgres", "pass" : "postgres", "host" : "localhost", "base" : "rivus" } } """ conf_path = os.path.join(pdir(pdir(pdir(__file__))), 'config.json') config = [] with open(conf_path) as conf: config = json.load(conf) # DB connection _user = config['db']['user'] _pass = config['db']['pass'] _host = config['db']['host'] _base = config['db']['base'] engine_string = ('postgresql://{}:{}@{}/{}' .format(_user, _pass, _host, _base)) engine = create_engine(engine_string) proj_name = 'mnl' base_directory = os.path.join('data', proj_name) data_spreadsheet = os.path.join(base_directory, 'data.xlsx') data = read_excel(data_spreadsheet) # data_bup = data.copy() vertex, edge = square_grid() vert_init_commodities(vertex, ['Elec', 'Gas'], [('Elec', 0, 100000)]) extend_edge_data(edge) prob = create_model(data, vertex, edge) solver = SolverFactory(config['solver']) solver = setup_solver(solver, log_to_console=False) solver.solve(prob, tee=True) test_id = rdb.init_run(engine, runner='Unittest') rdb.store(engine, prob, run_id=test_id) this_df = None dfs = data.keys() for df in dfs: if df == 'hub': continue # is not implemented yet this_df = data[df] print(df) re_df = rdb.df_from_table(engine, df, test_id) self.assertTrue(all(this_df.fillna(0) == re_df.reindex(this_df.index).fillna(0)), msg=('{}: Original and retrieved frames' ' are not identical'.format(df))) # Add implemented result dataframes cost, pmax, kappa_hub, kappa_process = get_constants(prob) source, _, _, _, _, = get_timeseries(prob) results = dict(source=source, cost=cost, pmax=pmax, kappa_hub=kappa_hub, kappa_process=kappa_process) dfs = ['source', 'cost', 'pmax', 'kappa_hub', 'kappa_process'] for df in dfs: this_df = results[df] print(df) re_df = rdb.df_from_table(engine, df, test_id) self.assertTrue(all(this_df.fillna(0) == re_df.reindex(this_df.index).fillna(0)), msg=('{}: Original and retrieved frames' ' are not identical'.format(df)))