Examples
Using both sync and async calls
import asyncio
import peewee
import peewee_async
database = peewee_async.PostgresqlDatabase('test')
loop = asyncio.get_event_loop()
class TestModel(peewee_async.AioModel):
text = peewee.CharField()
class Meta:
database = database
async with database.allow_sync():
# Create table synchronously!
TestModel.create_table(True)
# sync connection is closed automatically on exit
async def my_handler():
obj1 = TestModel.create(text="Yo, I can do it sync!")
obj2 = await TestModel.aio_create(text="Not bad. Watch this, I'm async!")
all_objects = await TestModel.select().aio_execute()
for obj in all_objects:
print(obj.text)
await TestModel.delete().aio_execute()
loop.run_until_complete(database.connect_async(loop=loop))
loop.run_until_complete(my_handler())
Using transactions
import asyncio
import peewee
import peewee_async
# ... some init code ...
async def test():
obj = await TestModel.aio_create(text='FOO')
obj_id = obj.id
try:
async with database.aio_atomic():
await TestModel.update(text='BAR').where(TestModel.id == obj_id).aio_execute()
raise Exception('Fake error')
except:
res = await TestModel.aio_get(TestModel.id == obj_id)
print(res.text) # Should print 'FOO', not 'BAR'
loop.run_until_complete(test())
Using async peewee with Tornado
Tornado is a mature and powerful asynchronous web framework. It provides its own event loop, but there’s an option to run Tornado on asyncio event loop. And that’s exactly what we need!
The complete working example is provided below. And here are some general notes:
Be aware of current asyncio event loop!
In the provided example we use the default event loop everywhere, and that’s OK. But if you see your application got silently stuck, that’s most probably that some task is started on the different loop and will never complete as long as that loop is not running.
Tornado request handlers does not start asyncio tasks by default.
The
CreateHandlerdemostrates that,current_task()returnsNoneuntil task is run explicitly.
Transactions must run within task context.
All transaction operations have to be done within task. So if you need to run a transaction from Tornado handler, you have to wrap your call into task with
create_task()orensure_future().Also note: if you spawn an extra task during a transaction, it will run outside of that transaction.
import tornado.web
# Set up Tornado application on asyncio
from tornado.platform.asyncio import AsyncIOMainLoop
# Set up database and manager
database = peewee_async.PooledPostgresqlDatabase('test')
# Define model
class TestNameModel(peewee_async.AioModel):
name = peewee.CharField()
class Meta:
database = database
def __str__(self):
return self.name
# Create table, add some instances
TestNameModel.create_table(True)
TestNameModel.get_or_create(id=1, defaults={'name': "TestNameModel id=1"})
TestNameModel.get_or_create(id=2, defaults={'name': "TestNameModel id=2"})
TestNameModel.get_or_create(id=3, defaults={'name': "TestNameModel id=3"})
database.close()
AsyncIOMainLoop().install()
app = tornado.web.Application(debug=True)
app.listen(port=8888)
app.database = database
# Add handlers
class RootHandler(tornado.web.RequestHandler):
"""Accepts GET and POST methods.
POST: create new instance, `name` argument is required
GET: get instance by id, `id` argument is required
"""
async def post(self):
name = self.get_argument('name')
obj = await TestNameModel.aio_create(name=name)
self.write({
'id': obj.id,
'name': obj.name
})
async def get(self):
obj_id = self.get_argument('id', None)
if not obj_id:
self.write("Please provide the 'id' query argument, i.e. ?id=1")
return
try:
obj = await TestNameModel.aio_get(id=obj_id)
self.write({
'id': obj.id,
'name': obj.name,
})
except TestNameModel.DoesNotExist:
raise tornado.web.HTTPError(404, "Object not found!")
class CreateHandler(tornado.web.RequestHandler):
async def get(self):
loop = asyncio.get_event_loop()
task1 = asyncio.Task.current_task() # Just to demonstrate it's None
task2 = loop.create_task(self.get_or_create())
obj = await task2
self.write({
'task1': task1 and id(task1),
'task2': task2 and id(task2),
'obj': str(obj),
'text': "'task1' should be null, "
"'task2' should be not null, "
"'obj' should be newly created object",
})
async def get_or_create(self):
obj_id = self.get_argument('id', None)
async with self.application.database.aio_atomic():
obj, created = await TestNameModel.aio_get_or_create(
id=obj_id,
defaults={'name': "TestNameModel id=%s" % obj_id})
return obj
app.add_handlers('', [
(r"/", RootHandler),
(r"/create", CreateHandler),
])
# Setup verbose logging
log = logging.getLogger('')
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)
# Run loop
print("""Run application server http://127.0.0.1:8888
Try GET urls:
http://127.0.0.1:8888?id=1
http://127.0.0.1:8888/create?id=100
Try POST with name=<some text> data:
http://127.0.0.1:8888
^C to stop server""")
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt:
print(" server stopped")