customer
table will have name
, address
, phone
columns.{"FirstName": "Alicia", "Address": "Taipei, Taiwan", "Gender": "Female"}
<!-- A document encoded in XML -->
<contact>
<firstname>Bob</firstname>
<lastname>Smith</lastname>
<phone type="Cell">(123) 555-0178</phone>
<phone type="Work">(890) 555-0133</phone>
<address>
<type>Home</type>
<street1>123 Back St.</street1>
<city>Boys</city>
<state>AR</state>
<zip>32225</zip>
<country>US</country>
</address>
</contact>
Database Engine | URL |
---|---|
MySQL | mysql://username:password@hostname/database |
Postgres | postgresql://username:password@hostname/database |
SQLite(Unix) | sqlite:////absolute/path/to/database |
SQLite (Windows) | sqlite:///c:/absolute/path/to/database |
localhost
or a remote server.# hello_db.py
import os
from flask import Flask
from flask_script import Manager
from flask_sqlalchemy import SQLAlchemy
#from flask.ext.sqlalchemy import SQLAlchemy
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__)
manager = Manager(app)
app.config["SQLALCHEMY_DATABASE_URI"] =\
"sqlite:///" + os.path.join(basedir, "data.sqlite")
app.config["SQLAlCHEMY_COMMIT_ON_TEARDOWN"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_ECHO"] = True
db = SQLAlchemy(app)
if __name__ == '__main__':
manager.run()
True
to enable automatic commits of database changes.True
, track modifications of objects and meit signals.True
SQLAlchemy will log all the SQL statements.db
instance provides a base class for models and helper classes and functions that are used to define their structure.# hello_db.py
import os
from flask import Flask
from flask_script import Manager
from flask_sqlalchemy import SQLAlchemy
#from flask.ext.sqlalchemy import SQLAlchemy
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__)
manager = Manager(app)
app.config["SQLALCHEMY_DATABASE_URI"] =\
"sqlite:///" + os.path.join(basedir, "data.sqlite")
app.config["SQLALCHEMY_COMMIT_ON_TEARDOWN"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_ECHO"] = True
db = SQLAlchemy(app)
# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return '<User %r>' % self.username
if __name__ == '__main__':
manager.run()
__tablename__
class variable defines the name of the table in the database.id
and username
are attributes of the model (columns of the table).db.Column
class.db.Column
is the type of the column.id
.Type name | Python type | Description |
---|---|---|
Integer | int | Regular integer (32 bits) |
SmallInteger | int | Short-range integer (16 bits) |
BigInteger | int or long | Unlimited precision integer |
Float | float | Floating-point number |
Numeric | decimal.Decimal | Fixed-point number |
String | str | Variable-length string |
Text | str | variable-length string (optimized for large or unbound length) |
Unicode | unicode | Variable-length Unicode string |
UnicodeText | unicode | Variable-length Unicode string (optimized for large or unbound length) |
Boolean | bool | Boolean value |
Date | datetime.date | Date value |
Time | datetime.time | Time value |
DateTime | datetime.datetime | Date and time value |
Interval | datetime.timedelta | Time interval |
Enum | str | List of string values |
PickleType | Any Python object | Automatic Pickle serialization |
LargeBinary | str | Binary blob |
Option name | Description |
---|---|
primary_key | If set to True , the column is the table’s primary key. |
unique | If set to True , do not allow duplicate values for this column. |
index | If set to True , create an index for this column, so that queries are more efficient. |
nullable | If set to True , allow empty values for this column. |
default | Define a default value for the column. |
db.create_all()
function to create a database based on the model classes.db.create_all()
function will not re-create or update a database table if it already exists in the database.(venv) $ python hello_db.py shell
>>> from hello_db import db
>>> db.create_all()
>>> db.drop_all()
>>> db.create_all()
>>> db.Model.metadata.tables.keys()
['users', 'roles']
id
attribute explicitly becasue Flask-SQLAlchemy will manage the primary keys for us.id
value has not been assigned.>>> from hello_db import Role
>>> admin_role = Role(name='Admin')
>>> mod_role = Role(name='Moderator')
>>> user_role = Role(name='User')
>>> admin_role.name
'Admin'
>>> admin_role.id
>>>
db.session
to add objects prepared to be written to the database.>>> db.session.add(admin_role)
>>> db.session.add(mod_role)
>>> db.session.add(user_role)
# a more concise way
>>> db.session.add_all([admin_role, mod_role, user_role])
db.session()
¶rollback()
to discard the previous action and reset the session.>>> db.session.rollback()
>>> db.session.add_all([admin_role, mod_role, user_role])
commit()
method to commit the session and write objects to the database.>>> db.session.commit()
id
attributes are now set to the Model.>>> admin_role.id
1
>>> mod_role.id
2
query
is available in each model class.Option | Description |
---|---|
filter() |
Returns a new query that adds an additional filter to the original query |
filter_by() |
Returns a new query that adds an additional equality filter to the original query |
limit() |
Returns a new query that limits the number of results of the original query to the given number |
offset() |
Returns a new query that applies an offset into the list of results of the original query |
order_by() |
Returns a new query that sorts the results of the original query according to the given criteria |
group_by() |
Returns a new query that groups the results of the original query according to the given criteria |
Option | Description |
---|---|
all() |
Returns all the results of a query as a list |
first() |
Returns the first result of a query, or None if there are no results |
first_or_404() |
Returns the first result of a query, or aborts the request and sends a 404 error as response if there are no results |
get() |
Returns the row that matches the given primary key, or None if no matching row is found |
get_or_404() |
Returns the row that matches the given primary key. If the key is not found it aborts the request and sends a 404 error as response |
count() |
Returns the result count of the query |
paginate() |
Returns a Pagination object that contains the specified range of results |
all()
¶>>> roles = Role.query.all()
>>> roles
[<Role 'Admin'>, <Role 'Moderator'>, <Role 'User'>]
>>> roles[0].name
'Admin'
>>> roles[0].id
1
filter()
and filter_by()
¶filter_by
: used for simple queries on the column names using regular argument.>
or <
.filer
: use ==
to accomplish the same thing with object overloaded ahead.>>> Role.query.filter_by(name='Admin')
<flask_sqlalchemy.BaseQuery object at 0x103c11510>
>>> str(Role.query.filter_by(name='Admin'))
'SELECT roles.id AS roles_id, roles.name AS roles_name \nFROM roles \nWHERE roles.name = ?'
# filter -> executor
>>> Role.query.filter_by(name='Admin').all()
[<Role 'Admin'>]
# error
>>> Role.query.filter_by(id > 2).all()
Traceback (most recent call last):
File "<console>", line 1, in <module>
TypeError: unorderable types: builtin_function_or_method() > int()
# db.Role.id overloaded ahead
# filter -> executor
>>> Role.query.filter(Role.id > 2).all()
[<Role 'User'>]
frist()
¶None
if no search result.>>> Role.query.filter_by(name='User').first()
<Role 'User'>
>>> Role.query.filter_by(name='Stranger').first()
>>>
add()
method to update any changes to the models. >>> admin_role.name = 'Administrator'
>>> db.session.add(admin_role)
>>> db.session.commit()
>>> admin_role.name
'Administrator'
delete()
method to delete object from the models.Role
model.>>> db.session.delete(mod_role)
>>> db.session.commit()
>>> Role.query.all()
[<Role 'Admininstrator'>, <Role 'User'>]
$
git clone https://github.com/win911/flask_class.git$
git checkout 6a$
git pull origin master" first.(venv) $ python hello_db.py shell
>>> from hello_db import db, User
Command |
---|
db.session.delete() |
db.session.add() |
db.session.add_all() |
db.session.commit() |
User.query.all() |
One-to-one: Both tables can have only one record on either side of the relationship. Each primary key value relates to only one (or no) record in the related table.
One-to-many: One of the column has duplicate records.
Many-to-many: Each record in both tables can relate to any number of records (or no records) in the other table. It requires a third table (association table or linking table) to accommodate the relationship. We'll talk about many-to-many relationship in Chapter 12.
users
and roles
¶ForeignKey
class.role_id
column is defined as a foreign key and establishs a relationship.ForeignKey
class.relationship()
function. Role
, the users attribute will return the list of users associated with that role.# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship('User') # relationship (from Role link to User)
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return '<User %r>' % self.username
db.relationship()
can locate the relationship’s foreign key on its own¶(venv) $ python hello_db.py shell
>>> from hello_db import db, User, Role
>>> db.drop_all()
>>> db.create_all()
>>> admin_role = Role(name='Admin')
>>> db.session.add(admin_role)
>>> db.session.commit() # need to commit first so that the id can be created
>>> maomao = User(username="Maomao", role_id=admin_role.id)
>>> db.session.add(maomao)
>>> db.session.commit()
>>> admin_role.users
[<User 'Maomao'>]
>>> Alicia = User(username="Alicia", role_id=admin_role.id)
>>> db.session.add(Alicia)
>>> db.session.commit()
>>> admin_role.users
[<User 'Maomao'>, <User 'Alicia'>]
>>>
# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship('User')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
role = db.relationship('Role') # relationship (from User link to Role)
def __repr__(self):
return '<User %r>' % self.username
(venv) $ python hello_db.py shell
>>> from hello_db import db, User, Role
>>> db.drop_all()
>>> db.create_all()
>>> admin_role = Role(name='Admin')
>>> db.session.add(admin_role)
>>> db.session.commit()
>>> maomao = User(username="Maomao", role_id=admin_role.id)
>>> Alicia = User(username="Alicia", role_id=admin_role.id)
>>> db.session.add_all([maomao, Alicia])
>>> db.session.commit()
>>> admin_role.users
[<User 'Maomao'>, <User 'Alicia'>]
>>> maomao.role
<Role 'Admin'>
>>> Alicia.role
<Role 'Admin'>
backref
¶backref
argument defines the reverse direction of the relationship.role
attribute to the User
model. role
is not a real db column.users
.relationship
in both class.# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship('User', backref='role')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return '<User %r>' % self.username
(venv) $ python hello_db.py shell
>>> from hello_db import db, User, Role
>>> db.drop_all()
>>> db.create_all()
>>> admin_role = Role(name='Admin')
>>> db.session.add(admin_role)
>>> db.session.commit()
>>> maomao = User(username="Maomao", role_id=admin_role.id)
>>> Alicia = User(username="Alicia", role_id=admin_role.id)
>>> db.session.add_all([maomao, Alicia])
>>> db.session.commit()
>>> admin_role.users
[<User 'Maomao'>, <User 'Alicia'>]
>>> maomao.role
<Role 'Admin'>
>>> Alicia.role
<Role 'Admin'>
# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
primary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
secondary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
primary_role = db.relationship('Role') # ambiguous
secondary_role = db.relationship('Role') # ambiguous
def __repr__(self):
return '<User %r>' % self.username
foreign_keys
argumetUser.primary_role
relationship will use the value present in primary_role_id
to identify the row in Role
to be loadedUser.secondary_role
relationship will use the value present in secondary_role_id
to identify the row in Role
to be loaded# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
primary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
secondary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
primary_role = db.relationship('Role', foreign_keys='User.primary_role_id')
secondary_role = db.relationship('Role', foreign_keys='User.secondary_role_id')
def __repr__(self):
return '<User %r>' % self.username
(venv)$ python hello_db.py shell
>>> from hello_db import db, User, Role
>>> db.drop_all()
>>> db.create_all()
>>> admin_role = Role(name='Admin')
>>> mod_role = Role(name='Mod')
>>> db.session.add_all([admin_role, mod_role])
>>> db.session.commit()
>>> maomao = User(username="Maomao", primary_role_id=admin_role.id, secondary_role_id=mod_role.id)
>>> Alicia = User(username="Alicia", primary_role_id=admin_role.id, secondary_role_id=mod_role.id)
>>> db.session.add_all([maomao, Alicia])
>>> db.session.commit()
>>> maomao.primary_role
<Role 'Admin'>
>>> maomao.secondary_role
<Role 'Mod'>
>>>
# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users_primary_role = db.relationship('User', primaryjoin='Role.id==User.primary_role_id')
users_secondary_role = db.relationship('User', primaryjoin='Role.id==User.secondary_role_id')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
primary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
secondary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
primary_role = db.relationship('Role', foreign_keys='User.primary_role_id')
secondary_role = db.relationship('Role', foreign_keys='User.secondary_role_id')
def __repr__(self):
return '<User %r>' % self.username
(venv) $ python hello_db.py shell
>>> from hello_db import db, User, Role
>>> db.drop_all()
>>> db.create_all()
>>> admin_role = Role(name='Admin')
>>> mod_role = Role(name='Mod')
>>> db.session.add_all([admin_role, mod_role])
>>> db.session.commit()
>>> maomao = User(username="Maomao", primary_role_id=admin_role.id, secondary_role_id=mod_role.id)
>>> Alicia = User(username="Alicia", primary_role_id=admin_role.id, secondary_role_id=mod_role.id)
>>> db.session.add_all([maomao, Alicia])
>>> db.session.commit()
>>> maomao.primary_role
<Role 'Admin'>
>>> Alicia.secondary_role
<Role 'Mod'>
>>> admin_role.users_primary_role
[<User 'Maomao'>, <User 'Alicia'>]
>>> admin_role.users_secondary_role
[]
# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users_primary_role = db.relationship('User', backref='primary_role', primaryjoin='Role.id==User.primary_role_id')
users_secondary_role = db.relationship('User', backref='secondary_role', primaryjoin='Role.id==User.secondary_role_id')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
primary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
secondary_role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return '<User %r>' % self.username
(venv) $ python hello_db.py shell
>>> from hello_db import db, User, Role
>>> db.drop_all()
>>> db.create_all()
>>> admin_role = Role(name='Admin')
>>> mod_role = Role(name='Mod')
>>> db.session.add_all([admin_role, mod_role])
>>> db.session.commit()
>>> maomao = User(username="Maomao", primary_role_id=admin_role.id, secondary_role_id=mod_role.id)
>>> Alicia = User(username="Alicia", primary_role_id=admin_role.id, secondary_role_id=mod_role.id)
>>> db.session.add_all([maomao, Alicia])
>>> db.session.commit()
>>> maomao.primary_role
<Role 'Admin'>
>>> Alicia.secondary_role
<Role 'Mod'>
>>> admin_role.users_primary_role
[<User 'Maomao'>, <User 'Alicia'>]
>>> admin_role.users_secondary_role
[]
$
git clone https://github.com/win911/flask_class.git$
git checkout 6b$
git pull origin master" first.# add relationship using foreign key
>>> maomao = User(username="Maomao", primary_role_id=admin_role.id, secondary_role_id=mod_role.id)
>>> db.session.add(maomao)
>>> db.session.commit()
# add relationship using back reference
>>> abby = User(username="Abby", primary_role=admin_role, secondary_role=mod_role)
>>> db.session.add(abby)
>>> db.session.commit()
Option name | Description |
---|---|
backref | Add a back reference in the other model in the relationship. |
primaryjoin | Specify the join condition between the two models explicitly. This is necessary only for ambiguous relationships. |
lazy | Specify how the related items are to be loaded. Possible values are select (items are loaded on demand the first time they are accessed), immediate (items are loaded when the source object is loaded), joined (items are loaded immediately, but as a join), subquery (items are loaded immediately, but as a subquery), noload (items are never loaded), and dynamic (instead of loading the items the query that can load them is given). |
uselist | If set to False , use a scalar instead of a list. |
order_by | Specify the ordering used for the items in the relationship. |
secondary | Specify the name of the association table to use in many-to-many relationships. |
secondaryjoin | Specify the secondary join condition for many-to-many relationships when SQLAlchemy cannot determine it on its own. |
Lazy
options¶# hello_db.py
import os
from flask import Flask, render_template, session, redirect, url_for
from flask_script import Manager
from flask_bootstrap import Bootstrap
from flask_moment import Moment
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import Required
from flask_sqlalchemy import SQLAlchemy
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__)
app.config['SECRET_KEY'] = 'hard to guess string'
app.config['SQLALCHEMY_DATABASE_URI'] =\
'sqlite:///' + os.path.join(basedir, 'data.sqlite')
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config["SQLALCHEMY_ECHO"] = True
manager = Manager(app)
bootstrap = Bootstrap(app)
moment = Moment(app)
db = SQLAlchemy(app)
# hello_db.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship('User', backref='role', lazy='dynamic')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return '<User %r>' % self.username
class NameForm(FlaskForm):
name = StringField('What is your name?', validators=[Required()])
submit = SubmitField('Submit')
# hello_db.py
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500
@app.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first() # query user
if user is None:
admin_role = Role.query.filter_by(name='Admin').first() # query role
user = User(username=form.name.data, role_id=admin_role.id) # add user
db.session.add(user) # add to database
session['known'] = False
else:
session['known'] = True
session['name'] = form.name.data
return redirect(url_for('index'))
return render_template('index.html', form=form, name=session.get('name'),
known=session.get('known', False))
# hello_db.py
@app.route('/db/users')
def list_users_in_db():
users = User.query.all()
return "<br>".join([user.username for user in users])
if __name__ == '__main__':
manager.run()
<!-- templates/index.html -->
{% extends "base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>Hello, {% if name %}{{ name }}{% else %}Stranger{% endif %}!</h1>
{% if not known %}
<p>Pleased to meet you!</p>
{% else %}
<p>Happy to see you again!</p>
{% endif %}
</div>
{{ wtf.quick_form(form) }}
{% endblock %}
(venv) $ python hello_db.py shell
>>> from hello_db import db, User, Role
>>> db.drop_all()
>>> db.create_all()
>>> admin_role = Role(name='Admin')
>>> mod_role = Role(name='Mod')
>>> db.session.add_all([admin_role, mod_role])
>>> db.session.commit()
>>> maomao = User(username='Maomao', role_id=admin_role.id)
>>> db.session.add(maomao)
>>> db.session.commit()
(venv) $ python hello_db.py runserver
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
make_shell_context()
function registers the app
, db
, Role
and User
model.make_context
is not set, it will return app
as default.# hello_db.py
#...
from flask_script import Shell
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role)
manager.add_command("shell", Shell(make_context=make_shell_context))
$
git clone https://github.com/win911/flask_class.git$
git checkout 6c$
git pull origin master" first.$ pip install flask-migrate
MigrateCommand
class to Flask-Script's manager
object.db
.# hello_db.py
from flask_migrate import Migrate, MigrateCommand
# from flask.ext.migrate import Migrate, MigrateCommand
# ...
migrate = Migrate(app, db)
manager.add_command('db', MigrateCommand)
$ python hello_db.py db init
migrate
command.upgrade()
and downgrade()
look for differences between the model definitions and the current database.upgrade()
: updates changes to the database.downgrade()
: removes changes.revision
command.upgrade()
and downgrade()
functions that need to be implemented by the developer. db migrate
command creates an automatic migration script.$ python hello_db.py db migrate -m "Initial migration"
db upgrade
command.db.create_all()
.$ python hello_db.py db upgrade
# hello_db.py
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
email = db.Column(db.String(64), unique=True, index=True)
def __repr__(self):
return '<User %r>' % self.username
$ python hello_db.py db migrate -m “Add new column in ‘users’”
$ python hello_db.py db upgrade
$
git clone https://github.com/win911/flask_class.git$
git checkout 6d$
git pull origin master" first.|-flasky
|-app/
|-templates/
|-static/
|-main/
|-__init__.py
|-errors.py
|-forms.py
|-views.py
|-__init__.py
|-email.py
|-models.py
|-migrations/
|-tests/
|-__init__.py
|-test*.py
|-venv/
|-requirements.txt
|-config.py
|-manage.py
requirements.txt
: pip freeze > requirments.txt
to generate the file.pip install -r requirments.txt
to regenerate the packages.config.py
: stores the configuration settings.manage.py
: launches the application and other application tasks.hello.py
to our application structure.¶# config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
# base class
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'hard to guess string'
SQLALCHEMY_COMMIT_ON_TEARDOWN = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
FLASKY_MAIL_SUBJECT_PREFIX = '[Flasky]'
FLASKY_MAIL_SENDER = 'Flasky Admin <flasky@example.com>'
FLASKY_ADMIN = os.environ.get('FLASKY_ADMIN')
@staticmethod
def init_app(app):
pass
# SQLALCHEMY_DATABASE_URI is assigned different values
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-test.sqlite')
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data.sqlite')
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
__init__
)|-app/
|-templates/
|-static/
|-main/
|-__init__.py
|-errors.py
|-forms.py
|-views.py
|-__init__.py
|-email.py
|-models.py
# app/__init__.py (Application package constructor)
from flask import Flask
from flask_bootstrap import Bootstrap
from flask_mail import Mail
from flask_moment import Moment
from flask_sqlalchemy import SQLAlchemy
from config import config
bootstrap = Bootstrap()
mail = Mail()
moment = Moment()
db = SQLAlchemy()
# application factory
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
bootstrap.init_app(app)
mail.init_app(app)
moment.init_app(app)
db.init_app(app)
# attach routes and custom error pages blueprint here
return app
app.route
decorator to register the route.|-main/
|-__init__.py
|-errors.py
|-forms.py
|-views.py
main
blueprint.# app/main/__init__.py (Blueprint creation)
from flask import Blueprint
main = Blueprint('main', __name__)
# can use from main import views
from . import views, errors
url_for()
function.index
-> main.index
url_for('index')
-> url_for('main.index')
url_for('.index')
: the blueprint for the current request is used.# app/main/views.py (Blueprint with application routes)
from flask import render_template, session, redirect, url_for, current_app
from .. import db
from ..models import User
from ..email import send_email
from . import main
from .forms import NameForm
@main.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first()
if user is None:
user = User(username=form.name.data)
db.session.add(user)
session['known'] = False
if current_app.config['FLASKY_ADMIN']:
send_email(current_app.config['FLASKY_ADMIN'], 'New User',
'mail/new_user', user=user)
else:
session['known'] = True
session['name'] = form.name.data
return redirect(url_for('.index'))
return render_template('index.html',
form=form, name=session.get('name'),
known=session.get('known', False))
.errorhandler
: only be invoked for errors that originate in the blueprint.app_errorhandler
: invoked for errors application-wide.# app/main/errors.py (Blueprint with error handlers)
from flask import render_template
from . import main
@main.app_errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@main.app_errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500
create_app()
factory function.# app/__init__.py
from flask import Flask
from flask_bootstrap import Bootstrap
from flask_mail import Mail
from flask_moment import Moment
from flask_sqlalchemy import SQLAlchemy
from config import config
bootstrap = Bootstrap()
mail = Mail()
moment = Moment()
db = SQLAlchemy()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
bootstrap.init_app(app)
mail.init_app(app)
moment.init_app(app)
db.init_app(app)
# blueprint registration
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
return app
forms
inside the blueprint as a module.# app/main/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import Required
class NameForm(FlaskForm):
name = StringField('What is your name?', validators=[Required()])
submit = SubmitField('Submit')
# app/models.py
from . import db
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship('User', backref='role', lazy='dynamic')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return '<User %r>' % self.username
from threading import Thread
from flask import current_app, render_template
from flask_mail import Message
from . import mail
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
|-app/
|-templates/
|-static/
|-main/
|-__init__.py
|-errors.py
|-forms.py
|-views.py
|-__init__.py
|-email.py
|-models.py
./manage.py
.# manage.py
#!/usr/bin/env python
import os
from app import create_app, db
from app.models import User, Role
from flask_script import Manager, Shell
from flask_migrate import Migrate, MigrateCommand
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
manager = Manager(app)
migrate = Migrate(app, db)
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role)
manager.add_command("shell", Shell(make_context=make_shell_context))
manager.add_command('db', MigrateCommand)
if __name__ == '__main__':
manager.run()
test_basics.py
here.setUp()
and tearDown()
run before and after each test.test_
are executed as tests.|-tests/
|-__init__.py
|-test_basics.py
tests/__init__.py
file to make the tests folder a package.unittest
package will scan all the modules and locate the tests automatically.Learn more about unittest
package: https://docs.python.org/2/library/unittest.html
# tests/test_basics.py
import unittest
from flask import current_app
from app import create_app, db
class BasicsTestCase(unittest.TestCase):
def setUp(self):
self.app = create_app('testing')
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
self.app_context.pop()
# Ensures that application instance exists
def test_app_exists(self):
self.assertFalse(current_app is None)
# Ensures that the application is running testing config
def test_app_is_testing(self):
self.assertTrue(current_app.config['TESTING'])
test
command¶test
command to the manage.py script.# manage.py
@manager.command
def test():
"""Run the unit tests."""
import unittest
tests = unittest.TestLoader().discover('tests')
unittest.TextTestRunner(verbosity=2).run(tests)
# config.py
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-test.sqlite')
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data.sqlite')
$
git clone https://github.com/win911/flask_class.git$
git checkout 7a$
git pull origin master" first.