All checks were successful
Build and Push Image / Build and push image (push) Has been skipped
132 lines
5.3 KiB
Python
132 lines
5.3 KiB
Python
from bokeh.core.enums import SpatialUnitsType
|
|
import mongo.build_db as beer_database
|
|
import mongo.query_db as beer_database_query
|
|
from table import table_builder
|
|
|
|
from flask import Flask, render_template, request, jsonify, redirect, session
|
|
from flask_wtf import FlaskForm, CSRFProtect
|
|
from flask_bootstrap import Bootstrap5
|
|
from wtforms import StringField, SubmitField, DateField, IntegerField, PasswordField, DecimalField, RadioField, TextAreaField, BooleanField
|
|
from wtforms.validators import DataRequired, Length, Optional
|
|
from waitress import serve
|
|
from bokeh.models.layouts import HBox
|
|
from bokeh.plotting import column
|
|
from charts import BeerCharts
|
|
from bokeh.io import output_file, show
|
|
from bokeh.layouts import row
|
|
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = 'testsecret' #this value will change
|
|
|
|
bootstrap = Bootstrap5(app)
|
|
|
|
csrf = CSRFProtect(app)
|
|
# used to configure the bokeh plot for graphs
|
|
output_file("/beer_data/src/flask/static/data_plot.html")
|
|
|
|
def create_graphs():
|
|
chart = BeerCharts.BeerCharts()
|
|
sg = chart.line_chart("SG", "sg", 30)
|
|
show(column(sg))
|
|
|
|
|
|
class userForm(FlaskForm):
|
|
username = StringField("User Name?", validators=[DataRequired()])
|
|
password = PasswordField("Password?")
|
|
submit = SubmitField("Letsa GO!")
|
|
|
|
class dataForm(FlaskForm):
|
|
beer_run_id = IntegerField("Beer Run ID")
|
|
beer_run_type = TextAreaField("Beer Type")
|
|
Date = DateField("Date:")
|
|
sg = IntegerField("SG Reading")
|
|
final_run = BooleanField("Final Reading?")
|
|
comment = TextAreaField("Any Comments?", validators=[Optional()])
|
|
submit = SubmitField("Write it, Write it REAAAAAAL GOOOD")
|
|
|
|
@app.route("/", methods=["GET","POST"])
|
|
def index():
|
|
form = userForm()
|
|
if form.validate_on_submit():
|
|
username = form.username.data
|
|
password = form.password.data
|
|
db = beer_database_query.pool_query()
|
|
if db.user_check(username, password):
|
|
session['logged_in'] = True
|
|
return redirect("/updater")
|
|
else:
|
|
return render_template("index.html", try_again=True, form=form)
|
|
else:
|
|
return render_template("index.html", try_again=False, form=form)
|
|
|
|
@app.route("/updater", methods=["GET", "POST"])
|
|
def updater():
|
|
if 'logged_in' not in session:
|
|
return redirect("/")
|
|
predicted_alc_table = table_builder.TableBuilder().table_build()
|
|
tr_replace_string = '<tr align="center" style="border-bottom:1pt solid black;">'
|
|
beer_html = predicted_alc_table.to_html(col_space='75px', index=False,
|
|
justify='center', border=3).replace('<tr>', tr_replace_string)
|
|
query_db = beer_database_query.pool_query()
|
|
query = query_db.get_top(10, "sg")
|
|
form = dataForm()
|
|
|
|
|
|
|
|
if form.validate_on_submit():
|
|
database = beer_database.beer_data()
|
|
new_record = {
|
|
"date": f'{form.Date.data}',
|
|
"beer_run_id": f'{form.beer_run_id.data}',
|
|
"beer_type": f'{form.beer_run_type.data}',
|
|
"sg" : f'{form.sg.data}',
|
|
"final_reading" : F'{form.final_run.data}',
|
|
"comment": f'{form.comment.data}'
|
|
}
|
|
if database.record_exists(new_record["date"], new_record["beer_run_id"]):
|
|
for field in database.existing_record:
|
|
for new_field in new_record:
|
|
if field == new_field:
|
|
if database.existing_record[field] != new_record[field]:
|
|
database.update_re_record(database.existing_record["_id"], field, new_record[field])
|
|
else:
|
|
if new_record["final_reading"] == "True":
|
|
final_run_value = True
|
|
else:
|
|
final_run_value = False
|
|
database.create_re_record(new_record["beer_run_id"], new_record["beer_type"], new_record["sg"],
|
|
new_record["date"], final_run_value, new_record["comment"])
|
|
|
|
return render_template("updater.html", beer_data = beer_html
|
|
, list=query, form=form, success=True, updater_name = "Saucy Beer Maker")
|
|
else:
|
|
return render_template("updater.html", beer_data = beer_html
|
|
, list=query, form=form, sucess=False)
|
|
|
|
# @app.route("/update_db", methods=["POST"])
|
|
# def beer_data_update():
|
|
# database = beer_database.beer_data()
|
|
# new_record = request.json
|
|
# if database.record_exists(new_record["date"], new_record["test_user"]):
|
|
# for field in database.existing_record:
|
|
# for new_field in new_record:
|
|
# if field == new_field:
|
|
# if database.existing_record[field] != new_record[field]:
|
|
# database.update_re_record(database.existing_record["_id"], field, new_record[field])
|
|
# else:
|
|
# database.create_re_record(new_record["beer_run_id"], new_record["beer_type"], new_record["sg"],
|
|
# new_record["date"], new_record["comment"])
|
|
|
|
|
|
|
|
# @app.route("/pool_top/<int:return_number>/<string:field>")
|
|
# def user_detail(id):
|
|
# query_db = beer_database_query.pool_query()
|
|
# query = query_db.get_top(return_number, field)
|
|
# return jsonify([row.to_json() for row in query])
|
|
|
|
if __name__ == '__main__':
|
|
#app.run(host='0.0.0.0')
|
|
serve(app, host='0.0.0.0', port=5000, url_scheme='https')
|