pool_data/src/flask/pool_data.py
2024-07-23 15:03:37 +10:00

141 lines
6.3 KiB
Python

from bokeh.core.enums import SpatialUnitsType
import mongo.build_db as pool_database
import mongo.query_db as pool_database_query
from flask import Flask, render_template, request, jsonify, redirect, session
from flask_wtf import FlaskForm, CSRFProtect
from flask_bootstrap import Bootstrap5
from wtforms import StringField, SubmitField, DateField, IntegerField, PasswordField, DecimalField, RadioField, TextAreaField
from wtforms.validators import DataRequired, Length, Optional
from waitress import serve
from bokeh.models.layouts import HBox
from bokeh.plotting import column
from charts import PoolCharts
from bokeh.io import output_file, show
from bokeh.layouts import row
app = Flask(__name__)
app.secret_key = 'testsecret' #this value will change
bootstrap = Bootstrap5(app)
csrf = CSRFProtect(app)
# used to configure the bokeh plot for graphs
output_file("/pool_data/src/flask/static/data_plot.html")
def create_graphs():
chart = PoolCharts.PoolCharts()
ph = chart.line_chart("Pool PH", "ph", 30)
total_chlorine = chart.line_chart("Pool Total Chlorine", "total_chlorine", 30)
free_chlorine = chart.line_chart("Pool Free Chlorine", "free_chlorine", 30)
alkalinity = chart.line_chart("Alkalinity", "alkalinity", 30)
salt = chart.line_chart("Salt", "salt", 30)
temp = chart.line_chart("Temperature", "temp", 30)
hardness = chart.line_chart("Hardness", "hardness", 30)
stabiliser = chart.line_chart("Stabiliser", "stabiliser", 30)
show(row(column(ph, total_chlorine, free_chlorine, temp), column(salt, alkalinity, hardness, stabiliser)))
class userForm(FlaskForm):
username = StringField("User Name?", validators=[DataRequired()])
password = PasswordField("Password?")
submit = SubmitField("Letsa GO!")
class dataForm(FlaskForm):
test_user = RadioField("Tester:",
choices=[("Isabella"), ("Heather"), ("Ariah")])
Date = DateField("Date:")
free_chlorine = IntegerField("Free Chlorine:", validators=[Optional()])
total_chlorine = IntegerField("Total Chlorine:", validators=[Optional()])
alkalinity = DecimalField("Alkalinity:", validators=[Optional()])
PH = DecimalField("PH:", validators=[Optional()])
hardness = IntegerField("Hardness", validators=[Optional()])
stabiliser = IntegerField("CYA - Stabliser", validators=[Optional()])
salt = IntegerField("Salt:", validators=[Optional()])
temp = DecimalField("Water Temperature", validators=[Optional()])
comment = TextAreaField("Any Comments?", validators=[Optional()])
submit = SubmitField("Write it, Write it REAAAAAAL GOOOD")
@app.route("/", methods=["GET","POST"])
def index():
form = userForm()
if form.validate_on_submit():
username = form.username.data
password = form.password.data
db = pool_database_query.pool_query()
if db.user_check(username, password):
session['logged_in'] = True
return redirect("/updater")
else:
return render_template("index.html", try_again=True, form=form)
else:
return render_template("index.html", try_again=False, form=form)
@app.route("/updater", methods=["GET", "POST"])
def updater():
if 'logged_in' not in session:
return redirect("/")
create_graphs()
query_db = pool_database_query.pool_query()
query = query_db.get_top(10, "ph")
form = dataForm()
if form.validate_on_submit():
database = pool_database.pool_data()
new_record = {
"ph": f'{form.PH.data}',
"total_chlorine" : f'{form.total_chlorine.data}',
"free_chlorine": f'{form.free_chlorine.data}',
"alkalinity": f'{form.alkalinity.data}',
"salt": f'{form.salt.data}',
"date": f'{form.Date.data}',
"test_user": f'{form.test_user.data}',
"temp": f'{form.temp.data}',
"hardness": f'{form.hardness.data}',
"stabiliser" : f'{form.stabiliser.data}',
"comment": f'{form.comment.data}'
}
if database.record_exists(new_record["date"], new_record["test_user"]):
for field in database.existing_record:
for new_field in new_record:
if field == new_field:
if database.existing_record[field] != new_record[field]:
database.update_re_record(database.existing_record["_id"], field, new_record[field])
else:
database.create_re_record(new_record["ph"], new_record["total_chlorine"], new_record["free_chlorine"],
new_record["alkalinity"], new_record["date"], new_record["test_user"],
new_record["temp"], new_record["hardness"], new_record["stabiliser"],
new_record["salt"], new_record["comment"])
return render_template("updater.html", list=query, form=form, success=True, updater_name = new_record["test_user"])
else:
return render_template("updater.html", list=query, form=form, sucess=False)
@app.route("/update_db", methods=["POST"])
def pool_data_update():
database = pool_database.pool_data()
new_record = request.json
if database.record_exists(new_record["date"], new_record["test_user"]):
for field in database.existing_record:
for new_field in new_record:
if field == new_field:
if database.existing_record[field] != new_record[field]:
database.update_re_record(database.existing_record["_id"], field, new_record[field])
else:
database.create_re_record(new_record["ph"], new_record["total_chlorine"], new_record["free_chlorine"],
new_record["alkalinity"], new_record["date"], new_record["test_user"],
new_record["temp"], new_record["hardness"], new_record["stabiliser"],
new_record["salt"], new_record["comment"])
@app.route("/pool_top/<int:return_number>/<string:field>")
def user_detail(id):
query_db = pool_database_query.pool_query()
query = query_db.get_top(return_number, field)
return jsonify([row.to_json() for row in query])
if __name__ == '__main__':
#app.run(host='0.0.0.0')
serve(app, host='0.0.0.0', port=5000, url_scheme='https')