我有一个 SqlAlchemy/Flask 应用程序。在其中,我有一个名为 MyModelA
的现有模型.这是它的样子:
class MyModelA(db.Model):
a_id = db.Column(db.Integer, nullable=False, primary_key=True)
my_field1 = db.Column(db.String(1024), nullable=True)
现在,我要添加一个子模型 MyModelB
.这是它的样子:
class MyModelB(db.Model):
b_id = db.Column(db.Integer, nullable=False, primary_key=True)
a_id = db.Column(db.Integer, db.ForeignKey(MyModelA.a_id), nullable=False)
my_field2 = db.Column(db.String(1024), nullable=True)
然后我运行 python manage.py migrate
.这是迁移文件中显示的内容:
def upgrade():
op.create_table('my_model_b',
sa.Column('b_id', sa.Integer(), nullable=False),
sa.Column('a_id', sa.Integer(), nullable=False),
sa.Column('my_field2', sa.String(length=1024), nullable=True),
sa.ForeignKeyConstraint(['a_id'], [u'my_model_a.a_id'], ),
sa.PrimaryKeyConstraint('b_id')
)
def downgrade():
op.drop_table('my_table_b')
我想编辑此迁移,以便它适用于 MyModelA
的每个实例, 实例的子记录 MyModelB
应该用 MyModelB.my_field2
创 build 置为 MyModelA.my_field1
.我该怎么做?
请出示升级和降级代码。
请您参考如下方法:
编辑:
你可以为一次性迁移做这样的事情:
db.engine.execute("INSERT INTO model_b (a_id) select a_id from model_a");
如果你真的想要 sqlalschemy 代码:
for model in db.query(ModelA).all()
db.session.add(ModelB(a_id=model.id))
db.session.commit()
上一个答案:
您所描述的不是您通常在迁移中所做的事情。迁移更改/创建数据库的结构。如果每次创建新的 MyModelA 时都需要它发生,这听起来更像是事件:http://docs.sqlalchemy.org/en/latest/orm/events.html#session-events
class MyModelA(db.Model):
...
@sqlalchemy.event.listens_for(SignallingSession, 'before_flush')
def insert_model_b(session, transaction, instances):
for instance in session.new:
if isinstance(instance, MyModelA):
model_b = MyModelB(a=instance)
session.add(model_b)
此外,您的架构需要显示该关系(不仅仅是外键),以便您可以将尚未插入的 model_a 分配给 model_b.a:
class MyModelB(db.Model):
b_id = db.Column(db.Integer, nullable=False, primary_key=True)
a_id = db.Column(db.Integer, db.ForeignKey(MyModelA.a_id), nullable=False)
a = relationship("MyModelA")
my_field2 = db.Column(db.String(1024), nullable=True)
完整代码示例:
import sqlalchemy
from sqlalchemy.orm import relationship
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.sqlalchemy import SignallingSession
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
class MyModelA(db.Model):
__tablename__ = 'model_a'
a_id = db.Column(db.Integer, nullable=False, primary_key=True)
my_field1 = db.Column(db.String(1024), nullable=True)
class MyModelB(db.Model):
__tablename__ = 'model_b'
b_id = db.Column(db.Integer, nullable=False, primary_key=True)
a_id = db.Column(db.Integer, db.ForeignKey(MyModelA.a_id), nullable=False)
a = relationship(MyModelA)
my_field2 = db.Column(db.String(1024), nullable=True)
@sqlalchemy.event.listens_for(SignallingSession, 'before_flush')
def insert_model_b(session, transaction, instances):
for instance in session.new:
if isinstance(instance, MyModelA):
model_b = MyModelB(a=instance)
session.add(model_b)
db.create_all()
model_a = MyModelA()
db.session.add(model_a)
db.session.commit()