beer_data/src/flask/beer_data.py
Andrew Ridgway 436836d4ac
All checks were successful
Build and Push Image / Build and push image (push) Has been skipped
get alco prediction and table working
2024-09-16 02:22:42 +00:00

132 lines
5.3 KiB
Python

from bokeh.core.enums import SpatialUnitsType
import mongo.build_db as beer_database
import mongo.query_db as beer_database_query
from table import table_builder
from flask import Flask, render_template, request, jsonify, redirect, session
from flask_wtf import FlaskForm, CSRFProtect
from flask_bootstrap import Bootstrap5
from wtforms import StringField, SubmitField, DateField, IntegerField, PasswordField, DecimalField, RadioField, TextAreaField, BooleanField
from wtforms.validators import DataRequired, Length, Optional
from waitress import serve
from bokeh.models.layouts import HBox
from bokeh.plotting import column
from charts import BeerCharts
from bokeh.io import output_file, show
from bokeh.layouts import row
app = Flask(__name__)
app.secret_key = 'testsecret' #this value will change
bootstrap = Bootstrap5(app)
csrf = CSRFProtect(app)
# used to configure the bokeh plot for graphs
output_file("/beer_data/src/flask/static/data_plot.html")
def create_graphs():
chart = BeerCharts.BeerCharts()
sg = chart.line_chart("SG", "sg", 30)
show(column(sg))
class userForm(FlaskForm):
username = StringField("User Name?", validators=[DataRequired()])
password = PasswordField("Password?")
submit = SubmitField("Letsa GO!")
class dataForm(FlaskForm):
beer_run_id = IntegerField("Beer Run ID")
beer_run_type = TextAreaField("Beer Type")
Date = DateField("Date:")
sg = IntegerField("SG Reading")
final_run = BooleanField("Final Reading?")
comment = TextAreaField("Any Comments?", validators=[Optional()])
submit = SubmitField("Write it, Write it REAAAAAAL GOOOD")
@app.route("/", methods=["GET","POST"])
def index():
form = userForm()
if form.validate_on_submit():
username = form.username.data
password = form.password.data
db = beer_database_query.pool_query()
if db.user_check(username, password):
session['logged_in'] = True
return redirect("/updater")
else:
return render_template("index.html", try_again=True, form=form)
else:
return render_template("index.html", try_again=False, form=form)
@app.route("/updater", methods=["GET", "POST"])
def updater():
if 'logged_in' not in session:
return redirect("/")
predicted_alc_table = table_builder.TableBuilder().table_build()
tr_replace_string = '<tr align="center" style="border-bottom:1pt solid black;">'
beer_html = predicted_alc_table.to_html(col_space='75px', index=False,
justify='center', border=3).replace('<tr>', tr_replace_string)
query_db = beer_database_query.pool_query()
query = query_db.get_top(10, "sg")
form = dataForm()
if form.validate_on_submit():
database = beer_database.beer_data()
new_record = {
"date": f'{form.Date.data}',
"beer_run_id": f'{form.beer_run_id.data}',
"beer_type": f'{form.beer_run_type.data}',
"sg" : f'{form.sg.data}',
"final_reading" : F'{form.final_run.data}',
"comment": f'{form.comment.data}'
}
if database.record_exists(new_record["date"], new_record["beer_run_id"]):
for field in database.existing_record:
for new_field in new_record:
if field == new_field:
if database.existing_record[field] != new_record[field]:
database.update_re_record(database.existing_record["_id"], field, new_record[field])
else:
if new_record["final_reading"] == "True":
final_run_value = True
else:
final_run_value = False
database.create_re_record(new_record["beer_run_id"], new_record["beer_type"], new_record["sg"],
new_record["date"], final_run_value, new_record["comment"])
return render_template("updater.html", beer_data = beer_html
, list=query, form=form, success=True, updater_name = "Saucy Beer Maker")
else:
return render_template("updater.html", beer_data = beer_html
, list=query, form=form, sucess=False)
# @app.route("/update_db", methods=["POST"])
# def beer_data_update():
# database = beer_database.beer_data()
# new_record = request.json
# if database.record_exists(new_record["date"], new_record["test_user"]):
# for field in database.existing_record:
# for new_field in new_record:
# if field == new_field:
# if database.existing_record[field] != new_record[field]:
# database.update_re_record(database.existing_record["_id"], field, new_record[field])
# else:
# database.create_re_record(new_record["beer_run_id"], new_record["beer_type"], new_record["sg"],
# new_record["date"], new_record["comment"])
# @app.route("/pool_top/<int:return_number>/<string:field>")
# def user_detail(id):
# query_db = beer_database_query.pool_query()
# query = query_db.get_top(return_number, field)
# return jsonify([row.to_json() for row in query])
if __name__ == '__main__':
#app.run(host='0.0.0.0')
serve(app, host='0.0.0.0', port=5000, url_scheme='https')