dump sqlalchemy in yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python
# coding: utf-8
from __future__ import division, unicode_literals
import argparse
import decimal
import sys
from os.path import isfile
import yaml
from AccessDataBase import BaseEngine
from sqlalchemy.orm import scoped_session, sessionmaker
engine, base = BaseEngine.create_base()
session = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine))
FIXTURE_DIR = 'Fixtures2'
ALL_TABLES = BaseEngine.Base.metadata.tables
def file_name(name):
return FIXTURE_DIR + '/' + name + ".yml"
def dump():
for table_name, table_class in ALL_TABLES.items():
to_yaml = []
rows = engine.execute('select * from "{}";'.format(table_name))
if rows.rowcount == 0:
continue
for x in rows:
to_yaml.append(dict(x))
with open(file_name(table_name), "w") as outfile:
stream = yaml.safe_dump(
to_yaml, default_flow_style=False, allow_unicode=True)
# stream = unicode(stream)
outfile.write(stream.replace(b'\n- ', b'\n\n- '))
print table_name + " dumped"
def load():
for table_name, table_class in ALL_TABLES.items():
f = file_name(table_name)
if not isfile(f):
continue
data = yaml.load(open(f, "r"))
for x in data:
if not x.get('id'):
ins = table_class.insert(values=x)
engine.execute(ins)
else:
update = table_class.update(values=x)
engine.execute(update)
engine.execute(update)
print('loaded '+ table_name)
parser = argparse.ArgumentParser()
parser.add_argument(
"-list", help="Show list of tables in database", action="store_true")
parser.add_argument(
"-dump", nargs='*', help="Generate yml files from database")
parser.add_argument(
"-load", nargs='*', help="display a square of a given number")
args = parser.parse_args()
if args.list:
print('ok')
elif args.dump:
print('ok')
elif args.load:
print('ok')
# dump()
load()