The Pylons Project is looking for GSoC 2012 Students!

There is currently 4 days left for students to apply to help out the Python Software Foundation and the Pylons project, so help get the word out!

Students can register here: http://www.google-melange.com/gsoc/org/google/gsoc2012/python

We are also looking for ideas for GSoC, so please add to and modify the list here:

https://github.com/Pylons/pyramid/wiki/GSoC-2012-Ideas

Writing tests for Pyramid and SQLAlchemy

TL;DR: Putting it all together, the full code can be found here: https://gist.github.com/1420255

Intro

Pyramid's documentation doesn't cover the preferred way to test with SQLAlchemy, because Pyramid tries to stay out of your way and allow you to make your own decisions. However, I feel i'ts necessary to document what I think is the best way to test.

When I first started writing tests with SQLAlchemy I found plenty of examples of how to to get started by doing something like this:

from db import session # probably a contextbound sessionmaker
from db import model

from sqlalchemy import create_engine

def setup():
    engine = create_engine('sqlite:///test.db')
    session.configure(bind=engine)
    model.metadata.create_all(engine)

def teardown():
    model.metadata.drop_all(engine)

def test_something():
    pass

I have seen this done so many times, but I feel there is so much wrong with it! So let's establish some base rules when testing:

  • Always test your system like it would be used in production. SQLite does not enforce the same rules or have the same features as Postgres or MySQL and will allow tests to pass that would otherwise fail in production.
  • Tests should be fast! You should be writing tests for all your code. This is the main reason people do test against SQLite, but we can't violate rule number one. We have to make sure tests against Postgres are fast, so we shouldn't be tearing down and recreating tables for every single test.
  • You should be able to execute in parallel to speed up when you have thousands of tests. Dropping and creating tables per test would not work in a parallel environment.

For an example, I have a project with 600+ tests and it would take 2 and half minutes to execute running against SQLite. But when we swapped our test configuration to execute against Postgres, testing took well over an hour. That is unacceptable!

But running them in parallel will give us a huge speed up. Check out the results of the tests running in single proc mode vs using all 4 cores:

$ py.test
======= 616 passed in 143.67 seconds =======

$ py.test -n4
======= 616 passed in 68.12 seconds =======

The right way

So what is the proper way to setup your tests? You should initialize the database when you start your test runner and then use transactions to rollback any data changes your tests made. This allows you to keep a clean database for each test in a very efficient way.

In py.test, you just have to create a file called conftest.py that looks similar to:

import os

ROOT_PATH = os.path.dirname(__file__)

def pytest_sessionstart():
    from py.test import config

    # Only run database setup on master (in case of xdist/multiproc mode)
    if not hasattr(config, 'slaveinput'):
        from models import initialize_sql
        from pyramid.config import Configurator
        from paste.deploy.loadwsgi import appconfig
        from sqlalchemy import engine_from_config
        import os

        ROOT_PATH = os.path.dirname(__file__)
        settings = appconfig('config:' + os.path.join(ROOT_PATH, 'test.ini'))
        engine = engine_from_config(settings, prefix='sqlalchemy.')

        print 'Creating the tables on the test database %s' % engine

        config = Configurator(settings=settings)
        initialize_sql(settings, config)

With py.test, when you are running in parallel mode, the pytest_sessionstart hook gets fired for each node, so we check that we are on the master node. Then we just grab our test.ini configuration file and execute the initialize_sql function.

Now that you have your initial test configuration finished, you have to define a base test class that does the transaction management in setUp and teardown.

First, lets setup the Base testing class what will manage our transactions:

import unittest
from pyramid import testing
from paste.deploy.loadwsgi import appconfig

from webtest import TestApp
from mock import Mock

from sqlalchemy import engine_from_config
from sqlalchemy.orm import sessionmaker
from app.db import Session
from app.db import Entity  # base declarative object
from app import main
import os
here = os.path.dirname(__file__)
settings = appconfig('config:' + os.path.join(here, '../../', 'test.ini'))

class BaseTestCase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.engine = engine_from_config(settings, prefix='sqlalchemy.')
        cls.Session = sessionmaker()

    def setUp(self):
        connection = self.engine.connect()

        # begin a non-ORM transaction
        self.trans = connection.begin()

        # bind an individual Session to the connection
        Session.configure(bind=connection)
        self.session = self.Session(bind=connection)
        Entity.session = self.session

    def tearDown(self):
        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        testing.tearDown()
        self.trans.rollback()
        self.session.close()

This base test case will wrap all your sessions in an external transaction so that you still have the ability to call flush/commit/etc and it will still be able to rollback any data changes you make.

Unit Tests

Now there are a few different types of tests you will want to run. First, you will want to do unit tests, which are small tests that only test 1 thing at a time. This means you will skip the routes, templates, etc. So let's setup our Unit Test Base class:

class UnitTestBase(BaseTestCase):
    def setUp(self):
        self.config = testing.setUp(request=testing.DummyRequest())
        super(UnitTestBase, self).setUp()

    def get_csrf_request(self, post=None):
        csrf = 'abc'

        if not u'csrf_token' in post.keys():
            post.update({
                'csrf_token': csrf
            })

        request = testing.DummyRequest(post)

        request.session = Mock()
        csrf_token = Mock()
        csrf_token.return_value = csrf

        request.session.get_csrf_token = csrf_token

        return request

We built in a utility function to help us test requests that require a csrf token as well. Here is how we would use this class:

class TestViews(UnitTestBase):
    def test_login_fails_empty(self):
        """ Make sure we can't login with empty credentials"""
        from app.accounts.views import LoginView
        self.config.add_route('index', '/')
        self.config.add_route('dashboard', '/')

        request = testing.DummyRequest(post={
            'submit': True,
        })

        view = LoginView(request)
        response = view.post()
        errors = response['errors']

        assert errors[0].node.name == u'csrf_token'
        assert errors[0].msg == u'Required'
        assert errors[1].node.name == u'Username'
        assert errors[1].msg == u'Required'
        assert errors[2].node.name == u'Password'
        assert errors[2].msg == u'Required'


    def test_login_succeeds(self):
        """ Make sure we can login """
        admin = User(username='sontek', password='temp', kind=u'admin')
        admin.activated = True
        self.session.add(admin)
        self.session.flush()

        from app.accounts.views import LoginView
        self.config.add_route('index', '/')
        self.config.add_route('dashboard', '/dashboard')

        request = self.get_csrf_request(post={
                'submit': True,
                'Username': 'sontek',
                'Password': 'temp',
            })

        view = LoginView(request)
        response = view.post()

        assert response.status_int == 302

Integration Tests

The second type of test you will want to write is an integration test. This will integrate with the whole web framework and actually hit the define routes, render the templates, and actually test the full stack of your application.

Luckily this is pretty easy to do with Pyramid using WebTest:

class IntegrationTestBase(BaseTestCase):
    @classmethod
    def setUpClass(cls):
        cls.app = main({}, **settings)
        super(IntegrationTestBase, cls).setUpClass()

    def setUp(self):
        self.app = TestApp(self.app)
        self.config = testing.setUp()
        super(IntegrationTestBase, self).setUp()

In setUpClass, we run the main function of the applications __init__.py that sets up the WSGI application and then we wrap it in a TestApp that gives us the ability to call get/post on it.

Here is an example of it in use:

class TestViews(IntegrationTestBase):
    def test_get_login(self):
        """ Call the login view, make sure routes are working """
        res = self.app.get('/login')
        self.assertEqual(res.status_int, 200)

    def test_empty_login(self):
        """ Empty login fails """
        res = self.app.post('/login', {'submit': True})

        assert "There was a problem with your submission" in res.body
        assert "Required" in res.body
        assert res.status_int == 200

    def test_valid_login(self):
        """ Call the login view, make sure routes are working """
        admin = User(username='sontek', password='temp', kind=u'admin')
        admin.activated = True
        self.session.add(admin)
        self.session.flush()

        res = self.app.get('/login')

        csrf = res.form.fields['csrf_token'][0].value

        res = self.app.post('/login',
            {
                'submit': True,
                'Username': 'sontek',
                'Password': 'temp',
                'csrf_token': csrf
            }
        )

        assert res.status_int == 302

Problems with this approach

If a test causes an error that will prevent the transaction from rolling back, such as closing the engine, then this approach will leave your database in a state that might cause other tests to fail.

If this happens tracing the root cause could be difficult but you should be able to just look at the first failed test unless you are running the tests in parallel.

If you are good about writing and running your tests regularly you should be able to catch individual tests causing issues like this fairly quickly.

Turning Vim into a modern Python IDE

TL;DR:

$ git clone https://github.com/sontek/dotfiles.git
$ cd dotfiles
$ ./install.sh vim

Download PDF Version

Intro

Back in 2008, I wrote the article Python with a modular IDE (Vim). Years later, I have people e-mailing me and commenting daily asking for more information, even though most of the information in it is outdated. Here is the modern way to work with Python and Vim to achieve the perfect environment.

Because one of the most important parts about a development environment is the ability to easily reproduce across machines, we are going to store our vim configuration in git:

$ mkdir ~/.vim/
$ mkdir ~/.vim/{autoload,bundle}
$ cd ~/.vim/
$ git init

The purpose of the autoload directory is to automatically load the vim plugin Pathogen, which we'll then use to load all other plugins that are located in the bundle directory. So download pathogen and put it in your autoload folder.

You'll need to add the following to your ~/.vimrc so that pathogen will be loaded properly. Filetype detection must be off when you run the commands so its best to execute them first:

filetype off
call pathogen#runtime_append_all_bundles()
call pathogen#helptags()

Now lets add all of the vim plugins we plan on using as submodules to our git repository:

git submodule add http://github.com/tpope/vim-fugitive.git bundle/fugitive
git submodule add https://github.com/msanders/snipmate.vim.git bundle/snipmate
git submodule add https://github.com/tpope/vim-surround.git bundle/surround
git submodule add https://github.com/tpope/vim-git.git bundle/git
git submodule add https://github.com/ervandew/supertab.git bundle/supertab
git submodule add https://github.com/sontek/minibufexpl.vim.git bundle/minibufexpl
git submodule add https://github.com/wincent/Command-T.git bundle/command-t
git submodule add https://github.com/mitechie/pyflakes-pathogen.git
git submodule add https://github.com/mileszs/ack.vim.git bundle/ack
git submodule add https://github.com/sjl/gundo.vim.git bundle/gundo
git submodule add https://github.com/fs111/pydoc.vim.git bundle/pydoc
git submodule add https://github.com/vim-scripts/pep8.git bundle/pep8
git submodule add https://github.com/alfredodeza/pytest.vim.git bundle/py.test
git submodule add https://github.com/reinh/vim-makegreen bundle/makegreen
git submodule add https://github.com/vim-scripts/TaskList.vim.git bundle/tasklist
git submodule add https://github.com/vim-scripts/The-NERD-tree.git bundle/nerdtree
git submodule add https://github.com/sontek/rope-vim.git bundle/ropevim
git submodule init
git submodule update
git submodule foreach git submodule init
git submodule foreach git submodule update

Thats it! Now that we've got our vim configuration in git!

Now lets look at how to use each of these plugins to improve the power of vim:

Basic Editing and Debugging

Code Folding

Lets first enable code folding. This makes it a lot easier to organize your code and hide portions that you aren't interested in working on. This is quite easy for Python, since whitespace is required.

In your ~/.vimrc just add:

set foldmethod=indent
set foldlevel=99

Then you will be able to be inside a method and type 'za' to open and close a fold.

Window Splits

Sometimes code folding isn't enough; you may need to start opening up multiple windows and working on multiple files at once or different locations within the same file. To do this in vim, you can use these shortcuts:

Vertical Split : Ctrl+w + v
Horizontal Split: Ctrl+w + s
Close current windows: Ctrl+w + q

I also like to bind Ctrl+<movement> keys to move around the windows, instead of using Ctrl+w + <movement>:

map <c-j> <c-w>j
map <c-k> <c-w>k
map <c-l> <c-w>l
map <c-h> <c-w>h
http://i.imgur.com/krj0l.png

Snippets

The next tweak that really speeds up development is using snipmate. We've already included it in our bundle/ folder so its already enabled. Try opening up a python file and typing 'def<tab>'. It should stub out a method definition for you and allow you to tab through and fill out the arguments, doc string, etc.

I also like to create my own snippets folder to put in some custom snippets:

$ mkdir ~/.vim/snippets
$ vim ~/.vim/snippets/python.snippets

Put this in the file:

snippet pdb
    import pdb; pdb.set_trace()

Now you can type pdb<tab> and it'll insert your breakpoint!

Task lists

Another really useful thing is to mark some of your code as TODO or FIXME! I know we all like to think we write perfect code, but sometimes you just have to settle and leave a note for yourself to come back later. One of the plugins we included was the tasklist plugin that will allow us to search all open buffers for things to fix. Just add a mapping to open it in ~/.vimrc:

map <leader>td <Plug>TaskList

Now you can hit <leader>td to open your task list and hit 'q' to close it. You can also hit enter on the task to jump to the buffer and line that it is placed on.

Revision History

The final basic editing tweak I suggest everyone start utilizing is the Gundo plugin. It'll allow you to view diff's of every save on a file you've made and allow you to quickly revert back and forth:

http://i.imgur.com/2NrPS.png

Just bind a key in your .vimrc to toggle the Gundo window:

map <leader>g :GundoToggle<CR>

Syntax Highlighting and Validation

Simply enable syntax highlighting in your ~/.vimrc:

syntax on                           " syntax highlighing
filetype on                          " try to detect filetypes
filetype plugin indent on    " enable loading indent file for filetype

Because we enabled pyflakes when we added it as a submodule in ~/.vim/bundle, it will notify you about unused imports and invalid syntax. It will save you a lot of time saving and running just to find out you missed a colon. I like to tell it not use the quickfix window:

let g:pyflakes_use_quickfix = 0
http://i.imgur.com/ZfjFe.png

Pep8

The final plugin that really helps validate your code is the pep8 plugin, it'll make sure your code is consistent across all projects. Add a key mapping to your ~/.vimrc and then you'll be able to jump to each of the pep8 violations in the quickfix window:

let g:pep8_map='<leader>8'
http://i.imgur.com/VU9AB.png

Tab Completion and Documentation

Vim has many different code completion options. We are going to use the SuperTab plugin to check the context of the code you are working on and choose the best for the situation. We've already enabled the SuperTab plugin in the bundle/ folder, so we just have to configure it to be context sensitive and to enable omni code completion in your ~/.vimrc:

au FileType python set omnifunc=pythoncomplete#Complete
let g:SuperTabDefaultCompletionType = "context"

Now we just enable the menu and pydoc preview to get the most useful information out of the code completion:

set completeopt=menuone,longest,preview
http://i.imgur.com/g4lxP.png

We also enabled the pydoc plugin at the beginning with all the submodules; that gives us the ability to hit <leader>pw when our cursor is on a module and have a new window open with the whole documentation page for it.

Code Navigation

Buffers

The most important part about navigating code within vim, is to completely understand how to use buffers. There is no reason to use tabs. Open files with :e <filename> to place in a buffer. We already installed the minibufexpl plugin, so you will already visually see every buffer opened. You can also get a list of them doing :buffers.

You can switch between the buffers using b<number>, such as :b1 for the first buffer. You can also use its name to match, so you can type :b mod<tab> to autocomplete opening the models.py buffer. You need to make sure you are using the minibufexpl from my github since it has patches that make it much better to work with.

To close a buffer you use :bd or :bw.

File Browser

NERD Tree is a project file browser. I must admit I used this heavily back when I was migrating from Visual Studio and used to the Solution Explorer, but I rarely use it anymore. Command-T is usually all you'll need. It is useful when you are getting to know a new codebase for the first time though. Lets bind a shortcut key for opening it:

map <leader>n :NERDTreeToggle<CR>
http://i.imgur.com/R4ZzQ.png

Refactoring and Go to definition

Ropevim is also a great tool that will allow you to navigate around your code. It supports automatically inserting import statements, goto definition, refactoring, and code completion. You'll really want to read up on everything it does, but the two big things I use it for is to jump to function or class definitions quickly and to rename things (including all their references).

For instance, if you are using django and you place your cursor over the class models.Model you reference and then called :RopeGotoDefintion, it would jump you straight to the django library to that class definition. We already have it installed in our bundles, so we bind it to a key to use it:

map <leader>j :RopeGotoDefinition<CR>
map <leader>r :RopeRename<CR>

Searching

The final tool that really speeds up navigating your code is the Ack plugin. Ack is similar to grep, but much better in my opinion. You can fuzzy text search for anything in your code (variable name, class, method, etc) and it'll give you a list of files and line numbers where they are defined so you can quickly cycle through them. Just bind the searching to a key:

nmap <leader>a <Esc>:Ack!

We use ! at the end of it so it doesn't open the first result automatically.

Integration with Git

We installed 2 plugins, git.vim and fugitive, that give us all the integration we need. Git.vim will provide us syntax highlighting for git configuration files; fugitive provides a great interface for interacting with git including getting diffs, status updates, committing, and moving files.

Fugitive also allows you to view what branch you are working in directly from vim. Add this to your statusline in ~/.vimrc:

%{fugitive#statusline()}

The big commands you need to know:

  • Gblame: This allows you to view a line by line comparison of who the last person to touch that line of code is.
  • Gwrite: This will stage your file for commit, basically doing git add <filename>
  • Gread: This will basically run a git checkout <filename>
  • Gcommit: This will just run git commit. Since its in a vim buffer, you can use keyword completion (Ctrl-N), like test_all<Ctrl-N> to find the method name in your buffer and complete it for the commit message. You can also use + and - on the filenames in the message to stage/unstage them for the commit.
http://i.imgur.com/NuRRj.png

Test Integration

django nose

Test runner integration really depends on the testing library you are using and what type of tests you are running but we included a great generic plugin called MakeGreen that executes off of vim's makeprg variable. So for instance, if you are using django with django-nose you could define a shortcut key in your ~/.vimrc like this:

map <leader>dt :set makeprg=python\ manage.py\ test\|:call MakeGreen()<CR>

This will just give you a green bar at the bottom of vim if your test passed or a red bar with the message of the failed test if it doesn't. Very simple.

py.test

I also included the py.test vim plugin for those who prefer it. This plugin has a lot more functionality including executing individual tests by class, file, or method. You can also cycle through the individual assertion errors. I have the following bindings:

" Execute the tests
nmap <silent><Leader>tf <Esc>:Pytest file<CR>
nmap <silent><Leader>tc <Esc>:Pytest class<CR>
nmap <silent><Leader>tm <Esc>:Pytest method<CR>
" cycle through test errors
nmap <silent><Leader>tn <Esc>:Pytest next<CR>
nmap <silent><Leader>tp <Esc>:Pytest previous<CR>
nmap <silent><Leader>te <Esc>:Pytest error<CR>
http://i.imgur.com/RAE7v.png

Virtualenv

Vim doesn't realize that you are in a virtualenv so it wont give you code completion for libraries only installed there. Add the following script to your ~/.vimrc to fix it:

" Add the virtualenv's site-packages to vim path
py << EOF
import os.path
import sys
import vim
if 'VIRTUAL_ENV' in os.environ:
    project_base_dir = os.environ['VIRTUAL_ENV']
    sys.path.insert(0, project_base_dir)
    activate_this = os.path.join(project_base_dir, 'bin/activate_this.py')
    execfile(activate_this, dict(__file__=activate_this))
EOF

Django

The only true django tweak I make is before I open vim I'll export the DJANGO_SETTINGS_MODULE environment so that I get code completion for django modules as well:

export DJANGO_SETTINGS_MODULE=project.settings

Random Tips

If you want to find a new color scheme just go to http://code.google.com/p/vimcolorschemetest/ to preview a large selection.

© John Anderson <sontek@gmail.com> 2011

PyCon Sprints Part 1: The realtime web with gevent, socket io, redis and django

TL;DR: Get the source here.

One of the major themes of PyCon was realtime web utilizing websockets or long polling and coroutines.

Websockets is a new feature in modern browsers which allows bi-directional communication between your server and the users web browser but it isn't a reliable way to do realtime communication since not everyone upgrades as quickly as us web developers would like and older browsers don't support it.

So you either have to fallback to long polling if they don't have websockets or just only support long polling since it works on all browsers. Longpolling is opening a connection to the server and keeping it open with javascript so that the server can send messages back to the client.

Luckily, the fine folks that developed Socket.io have done the legwork to do feature fallback and so it'll work on all browsers by testing for websockets, flash sockets, and then as a last resort long polling.

Page postbacks and refreshes are a thing of the past, the future of the web is realtime communication.

To show you how to achieve this we are going to build a quick Tic Tac Toe application using SocketIO, gevent, redis, and Django.

Lets setup the development environment first:

$ virtualenv --no-site-packages tictactoe
$ cd tictactoe/
$ source bin/activate
$ mkdir src
$ cd src/

Before you continue make sure you install libevent and redis-server using your operating systems package manager.

Then you want to create a pip requirements.txt file that will provide you all the packages you want, the version of redis is very important since they didn't have a publish/subscribe model until 2.2.2:

django==1.3
gevent
redis==2.2.2
simplejson
mock
hg+https://bitbucket.org/Jeffrey/gevent-websocket#egg=gevent-websocket
hg+https://bitbucket.org/Jeffrey/gevent-socketio#egg=gevent-socketio

Then use pip to install everything:

$ pip install -r requirements.txt

Now we need to create the django application. We are going to organize our apps into the apps/ folder, this is a personal preference of mine but I recommend you do the same:

$ django-admin.py startproject tictactoe
$ cd tictactoe/
$ mkdir apps
$ django-admin.py startapp core
$ mv core/ apps/

Open up settings.py and add 'core' to INSTALLED_APPS and create utility attribute we'll use to figure out the directory we are working out of, you should also configure your database settings and at this time:

import os
PROJECT_ROOT = os.path.dirname(__file__)

Then open up manage.py and add a this sys.path line so that it knows we are storing our apps in the apps/ folder.

import sys, os
sys.path.insert(0, os.path.join(settings.PROJECT_ROOT, "apps"))

Now that we've got a base development system configured we can start setting up our environment for doing realtime development. The first step is to setup a WSGI webserver that supports coroutines, since the standard django development server is single threaded. We will use gevent to monkey patch the standard library to get async support:

Create a file called run.py that will replace manage.py runserver:

#!/usr/bin/env python
from gevent import monkey
from socketio import SocketIOServer
import django.core.handlers.wsgi
import os
import sys

# import the django settings file to get PROJECT_ROOT
import settings

# use gevent to patch the standard lib to get async support
monkey.patch_all()

PORT = 9000
os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'
application = django.core.handlers.wsgi.WSGIHandler()

# add our project directory to the path
sys.path.insert(0, os.path.join(settings.PROJECT_ROOT, "../"))

# add our apps directory to the path 
sys.path.insert(0, os.path.join(settings.PROJECT_ROOT, "apps"))

if __name__ == '__main__':
    # Launch the redis server in the background
    os.popen('redis-server &')
    print('Listening on http://127.0.0.1:%s and on port 843 (flash policy server)' % PORT)
    SocketIOServer(('', PORT), application, resource="socket.io").serve_forever()

Now that we have a server that can support the asynchronous calls that we'll be making we can start designing our application.

Lets first setup the apps/core/models.py to describe the database tables we'll need:

from django.db import models
from django.contrib.auth.models import User
import pickle

class Game(models.Model):
    player1 = models.ForeignKey(User, related_name='player1_set')
    player2 = models.ForeignKey(User, related_name='player2_set')
    last_move = models.CharField(max_length=1, null=True, blank=True)
    board = models.CharField(max_length=100,
            default=str(pickle.dumps([''] * 9)))

    def __unicode__(self):
        board = pickle.loads(str(self.board))
        return '%s vs %s (%s)' % (self.player1.username,
                self.player2.username, board)

    def make_move(self, player, move):
        """
        player is X or O and move is a number 0-9
        """
        board = pickle.loads(self.board)
        board[move] = player
        self.board = pickle.dumps(board)
        self.last_move = player
        self.save()

    def get_valid_moves(self):
        """
        Returns a list of valid moves. A move can be passed to get_move_name to
        retrieve a human-readable name or to make_move/undo_move to play it.
        """
        board = pickle.loads(self.board)
        return [pos for pos in range(9) if board[pos] == '']

    def all_equal(self, list):
            """
            Returns True if all the elements in a list are equal, or if
            the list is empty. 
            """
            return not list or list == [list[0]] * len(list)

    def get_winner(self):
        """
        Determine if one player has won the game. Returns X, O, '' for Tie,
        or None
        """
        board = pickle.loads(self.board)

        winning_rows = [[0,1,2], [3,4,5], [6,7,8], # horizontal
                        [0,3,6], [1,4,7], [2,5,8], # vertical
                        [0,4,8], [2,4,6]]          # diagonal

        for row in winning_rows:
            if board[row[0]] != '' and self.all_equal([board[i] for i in row]):
                return board[row[0]]

        # No winner found, check for a tie
        if not self.get_valid_moves():
            return ''

        return None

Then open up apps/core/tests.py and create tests to make sure getting a winner and getting valid moves works properly, because code without tests is broken code!

from django.test import TestCase
from core.models import Game
from django.contrib.auth.models import User
import pickle

class TestTicTacToeBoard(TestCase):
    def setUp(self):
        self.player1 = User.objects.create(username='X')
        self.player2 = User.objects.create(username='O')

        self.player1.save()
        self.player2.save()

    def test_finds_winner(self):
        """
        Tests that getting a winner works properly
        """
        board = pickle.dumps(['X', 'X', 'X', '', '', '', '', '', ''])
        game = Game(player1=self.player1, player2=self.player2,
                board=board)
        winner = game.get_winner()

        self.assertEqual(winner, 'X')

    def test_doesnt_find_winner(self):
        board = pickle.dumps(['', '', '', '', '', '', '', '', ''])
        game = Game(player1=self.player1, player2=self.player2,
                board=board)
        winner = game.get_winner()

        self.assertEqual(winner, None)

    def test_find_tie(self):
        """
        Tests that you get a tie when the board is full
        """
        board = pickle.dumps(['X', 'X', 'O',
                              'O', 'O', 'X',
                              'X', 'O', 'X'])

        game = Game(player1=self.player1, player2=self.player2,
                board=board)
        winner = game.get_winner()

        self.assertEqual(winner, '')

    def test_make_move(self):
        """
        Tests that you can make moves 
        """
        board = pickle.dumps(['', '', '', '', '', '', '', '', ''])

        game = Game(player1=self.player1, player2=self.player2,
                board=board)

        game.make_move('X', 0)
        board = pickle.dumps(['X', '', '', '', '', '', '', '', ''])

        self.assertEqual(board, game.board)
        self.assertEqual('X', game.last_move)

    def test_gets_valid_moves(self):
        """
        Tests that getting valid moves works properly
        """
        board = pickle.dumps(['', '', '', '', '', '', '', '', ''])
        game = Game(player1=self.player1, player2=self.player2,
                board=board)
        moves = game.get_valid_moves()

        self.assertEqual(len(moves), 9)

    def test_doesnt_gets_valid_moves(self):
        """
        Tests that getting valid moves works properly
        """
        board = pickle.dumps(['X', 'X', 'X', 'X', 'X', 'X', 'X', 'X', 'X'])
        game = Game(player1=self.player1, player2=self.player2,
                board=board)
        moves = game.get_valid_moves()

        self.assertEqual(len(moves), 0)

Now that we have a tic tac toe board that is tested, we just need to create our views to allow the 2 users to play a game. We are going to need a view for viewing the game, a view for asynchronously making moves, and a view that our clients can keep an open socket to for the bi-directional communication.

Create the urls.py:

from django.conf.urls.defaults import (patterns, include, url, handler500,
        handler404)

# we'll use admin for creating users and games
from django.contrib import admin
admin.autodiscover()

from core.views import (
    create_move,
    view_game,
    socketio,
)
urlpatterns = patterns('',
    url(
        regex=r'^create_move/(?P<game_id>\d+)/$',
        view=create_move,
        name='create_move'
    ),
    url(
        regex=r'^view_game/(?P<game_id>\d+)/$',
        view=view_game,
        name='view_game'
    ),
    url(
        regex=r'^socket\.io',
        view=socketio,
        name='socketio'
    ),
    (r'^admin/', include(admin.site.urls)),
)

# Django 1.3 Features, allows us to serve static files easily
from django.contrib.staticfiles.urls import staticfiles_urlpatterns
urlpatterns += staticfiles_urlpatterns()

Then our views.py:

from django.views.decorators.http import require_http_methods
from django.contrib.auth.decorators import login_required
from django.shortcuts import get_object_or_404, render_to_response
from django.http import HttpResponse, Http404
from django.template import RequestContext
from django.conf import settings

from redis import Redis
from gevent.greenlet import Greenlet

from core.models import Game

REDIS_HOST = getattr(settings, 'REDIS_HOST', 'localhost')

def _sub_listener(socketio, chan):
    """
    This is the method that will block and listen
    for new messages to be published to redis, since
    we are using coroutines this method can block on
    listen() without interrupting the rest of the site
    """
        red = Redis(REDIS_HOST)
        red.subscribe(chan)

        for i in red.listen():
            socketio.send({'message': i})

def socketio(request):
    """
    This view will handle the 'subscribe' message
    from the client and spawn off greenlet coroutines
    to monitor messages on redis
    """
    socketio = request.environ['socketio']

    while True:
        message = socketio.recv()

        if len(message) == 1:
            message = message[0].split(':')

            if message[0] == 'subscribe':
                print 'spawning sub listener'
                g = Greenlet.spawn(_sub_listener, socketio, message[1])

    return HttpResponse()

@require_http_methods(["POST"])
@login_required
def create_move(request, game_id):
    """
    Creates a move for the logged in player on a specific game
    """
    game, player, opponent_id = _get_game_data(request, game_id)
    move = int(request.POST['move'])
    game.make_move(player, move)

    # Announce to the opponent that the current player made a move
    red = Redis(REDIS_HOST)

    _announce_player_moved(red, game_id, opponent_id, player, move),
    winner = game.get_winner()

    if winner:
        # Announce to current player and the opponent that the game is over
        _announce_game_over(red, game_id, request.user.id, player)
        _announce_game_over(red, game_id, opponent_id, player)

    return HttpResponse()

def view_game(request, game_id, template_name='core/view_game.html'):
    """
    Renders a tic tac toe board to be played for a specific game
    """
    game, player, opponent_id = _get_game_data(request, game_id)

    if game.last_move:
        current_player = 'X' if game.last_move == 'O' else 'O'
    else:
        current_player = 'X'

    board = game.get_board()
    winner = game.get_winner()

    context = { 'game_id': game_id,
                'board': board,
                'player': player,
                'current_player': current_player,
                'winner': winner,
                'game_over': False if winner == None else True
              }

    return render_to_response(template_name, context,
            context_instance=RequestContext(request))

def _announce_player_moved(red, game_id, to_user_id, player, move):
    """
    Publishes a message to redis to to_user_id that their opponent has
    completed a move
    """
    red.publish(to_user_id, ['opponent_moved', int(game_id), player, move])

def _announce_game_over(red, game_id, to_user_id, winner):
    """
    Publishes a message to redis to to_user_id that the game as finished
    """
    red.publish(to_user_id, ['game_over', int(game_id), winner])

def _get_game_data(request, game_id):
    """
        Grabs the game, current player, and its opponent
    """
    game = get_object_or_404(Game, pk=game_id)

    if game.player1 == request.user:
        player = 'X'
        opponent_id = game.player2.id
    elif game.player2 == request.user:
        player = 'O'
        opponent_id = game.player1.id
    else:
        raise Http404

    return game, player, opponent_id

The special part here is in the socketio view, we are launching greenlet coroutines that subscribe to a "channel" on the redis server.

Like we established before, it its not tested, it doesn't work. So we need to write more tests to verify that the views we just created work:

class TestGameViews(TestCase):
    def setUp(self):
        self.player1 = User.objects.create(username='X')
        self.player1.set_password('test')
        self.player2 = User.objects.create(username='O')

        self.player1.save()
        self.player2.save()

        self.game = Game(player1=self.player1, player2=self.player2)
        self.game.save()

        self.client = Client()
        self.client.login(username='X', password='test')

    def test_create_move_publishes_to_redis(self):
        """
        Tests that we are publishing to redis when we create moves
        """
        request = Mock(name='request')
        request.user = self.player1

        redis = Mock(name='redis')
        redis.publish = Mock()

        self.game.board = pickle.dumps(['X', '', '',
                                        '', 'X', '',
                                        '', '', ''])
        self.game.save()

        with patch('core.views.Redis') as mock_redis:
            mock_redis.return_value = redis

            move = 8
            player = 'X'
            response = self.client.post('/create_move/%d/' % self.game.id,
                        {
                            'move': move
                        }
                    )

            redis.publish.assert_called_with(self.player2.id,
                    ['game_over', self.game.id, player])

            _pop_last_call(redis.publish)

            redis.publish.assert_called_with(self.player1.id,
                    ['game_over', self.game.id, player])

            _pop_last_call(redis.publish)

            redis.publish.assert_called_with(self.player2.id,
                    ['opponent_moved', self.game.id, player, move])

    def test_winning_move_publishes_to_redis(self):
        """
        Tests that we are publishing to redis when we create moves
        """
        request = Mock(name='request')
        request.user = self.player1

        redis = Mock(name='redis')
        redis.publish = Mock()

        with patch('core.views.Redis') as mock_redis:
            mock_redis.return_value = redis

            move = 0
            player = 'X'
            response = self.client.post('/create_move/%d/' % self.game.id,
                        {
                            'move': move
                        }
                    )

            redis.publish.assert_called_once_with(self.player2.id,
                    ['opponent_moved', self.game.id, player, move])

    def test_create_move_makes_move(self):
        """
        Tests that we are creating moves in the db when we call create_move
        """
        redis = Mock(name='redis')
        redis.publish = Mock()

        with patch('core.views.Redis') as mock_redis:
            mock_redis.return_value = redis

            move = 0
            player = 'X'
            response = self.client.post('/create_move/%d/' % self.game.id,
                        {
                            'move': move
                        }
                    )

            game = Game.objects.get(pk=self.game.id)
            board = game.get_board()
            self.assertEqual(board[0], player)

    def test_make_move_wins(self):
        pass

def _pop_last_call(mock):
    if not mock.call_count:
        raise AssertionError('Cannot pop last call: call_count is 0')

    mock.call_args_list.pop()

    try:
        mock.call_args = mock.call_args_list[-1]
    except IndexError:
        mock.call_args = None
        mock.called = False

    mock.call_count -=1

There is a copy fancy things going on here, if you aren't familiar with Mock we are using it to tell our view whenever it tries to use the Redis class to use our special mocked object instead so we can record its actions. Then we have the _pop_last_call function that is just to work around the fact that Mock only records the last call of a function and we want to make sure it fired the publish 3 times.

Now that we've proven that our db model and our views are working with our tests, the final piece is to create a template and write the javascript to make the call backs!

We need to get back into settings.py to define our static file directories for loading the javascript:

STATICFILES_DIRS = (
    os.path.join(PROJECT_ROOT, 'static'),
)

Now create your static folder and put jquery 1.5.1 (jquery 1.5 has a bug in it) and socket.io.js in there.

Then we'll define the javascript we need to interact with the the socketio view we defined earlier in a file called game.js:

socket.connect();

socket.on("message", function(obj){
    alert(obj);
    if (obj.message.type == "message") {
        var data = eval(obj.message.data);

        if (data[1] == game_id) {
            if (data[0] == "game_over") {
                    winner = data[2];
                    game_over = true;

                    if (data[2] == "") {
                        SetMessage("Its a tie!");
                    }
                    else {
                        SetMessage("The winner is: " + data[2]);
                    }
                }
            else if (data[0] == "opponent_moved") {
                $('#cell' + data[3]).html(data[2]);
                SwapUser();
            }
        }
    }
});

function MakeMove(sender, move) {
    if (player == current_player && game_over == false) {
        if ($(sender).text().trim() == "") {
            $(sender).html(player);
            SwapUser();

            $.post(create_url, {'move': move},
                function(data) {
                    // successfully made a move
                }
            )
        }
    }
}

function SwapUser() {
    var computer = player == "X" ? "O" : "X";

    if (current_player == player) {
        current_player = computer;
        SetMessage("Your opponents turn!");
    } else {
        current_player = player;
        SetMessage("Your turn!");
    }
}

socket.send("subscribe:" + user_id);

function SetMessage(message) {
    $("#messages").html("<div>" + message + "</div>");
    $("#messages").show();
}

The import parts here are sending the subscribe message to the server so that it will start listening on that specific channel and then handling the message event from the socket so we can do something when the server sends us a new message.

To tie it all togethers lets create templates/core/view_game.html and a django.js file that allows us to call the webserver from javascript with CSRF protection:

django.js:

$('html').ajaxSend(function(event, xhr, settings) {
    function getCookie(name) {
        var cookieValue = null;
        if (document.cookie && document.cookie != '') {
            var cookies = document.cookie.split(';');
            for (var i = 0; i < cookies.length; i++) {
                var cookie = jQuery.trim(cookies[i]);
                // Does this cookie string begin with the name we want?
                if (cookie.substring(0, name.length + 1) == (name + '=')) {
                    cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
                    break;
                }
            }
        }
        return cookieValue;
    }
    if (!(/^http:.*/.test(settings.url) || /^https:.*/.test(settings.url))) {
        // Only send the token to relative URLs i.e. locally.
        xhr.setRequestHeader("X-CSRFToken", getCookie('csrftoken'));
    }
});

templates/core/view_game.html:

<html>
    <head>
        <title>TicTacToe</title>
        <link rel="stylesheet" type="text/css" href="{{ STATIC_URL }}style.css" />
        <script type="text/javascript" src="{{ STATIC_URL }}jquery-1.5.1.min.js"></script>
        <script type="text/javascript" src="{{ STATIC_URL }}django.js"></script>
        <script type="text/javascript" src="{{ STATIC_URL }}socket.io.js"></script>
    </head>
    <body>

    <script type="text/javascript">
        var player = "{{ player }}";
        var current_player = "{{ current_player }}";
        var game_id = {{ game_id }};
        var user_id = {{ request.user.id }};
        var game_over = {{ game_over|lower }};
        var create_url = "{% url create_move game_id %}";

        var socket = new io.Socket(null, {port: {{ request.environ.SERVER_PORT }}, rememberTransport: false});
    </script>
    <script type="text/javascript" src="{{ STATIC_URL }}game.js"></script>
    You are: {{ player }}
    <div id="messages">
        {% if winner == None %}
            {% if player == current_player %}
                Your turn
            {% else %}
                Your opponents turn
            {% endif %}
        {% else %}
            {% if winner == "" %}
                Its a tie!
            {% else %}
                {{ winner }} wins!
            {% endif %}
        {% endif %}
    </div>
    {% for row in board %}
        <div class="cell" id="cell{{ forloop.counter0 }}" onclick="MakeMove(this, '{{ forloop.counter0 }}')">
            {{ row }}
        </div>
        {% if forloop.counter|divisibleby:3 %}
            <br class="clear" />
        {% endif %}
    {% endfor %}
    </body>
</html>

and then static/style.css file:

.cell {
    border: solid 1px #000;
    width: 100px;
    height: 100px;
    float: left;
    text-align: center;
    font-size: 70px;
}
.clear {
    clear: both;
}

and you now have a realtime tic tac toe application.

To play the game you just have to launch the redis server and your WSGI webserver:

$ redis-server
$ python run.py

Watch a video of it in action here

Using Django Context Variables with Javascript/Ajax

If you need to access Django context variables in javascript you can create a script node to store the values you want and then your scripts can access them:

<html>
<head>
    <script type="text/javascript">
        var today = "{{ today }}";
    </script>

    <script type="text/javascript" src="{{ MEDIA_URL }}/jquery.js"></script>
    <script type="text/javascript" src="{{ MEDIA_URL }}/today.js"></script>
</head>
<body>
    Date is: <div id="date">{{ today }}</div>
    <a href="#" id="yesterday">Get Yesterday</a>
</body>

Now that you can access the context variables you can make an ajax call back to have Django do more processing on the data using jQuery's post function:

// today.js
$(document).ready(function() {
    $('#yesterday').click(function() {
        $.post("/today/", {
            today: today,
            },

            function(data) {
                $('#date').html(data);
                today = data;
            }
        );
    });
});

Now all you have to do is define a view that loads the original context data and handles the async call back using the is_ajax method on the request like this:

from datetime import date, timedelta
from django.template import RequestContext
from django.shortcuts import render_to_response
from django.http import HttpResponse
from datetime import datetime
from dateutil import parser

DATE_FORMAT='%m/%d/%Y'

def today(request, template='core/today.html'):
    if request.is_ajax():
        t = request.POST.get('today')
        dt = parser.parse(t)
        one_day = timedelta(days=1)
        yesterday = dt - one_day

        return HttpResponse(yesterday.strftime(DATE_FORMAT))
    else:
        today = date.today()
        return render_to_response(template, {'today': today.strftime(DATE_FORMAT)},
                context_instance=RequestContext(request))

This is a very basic example of how to access Django's context variables and using ajax with Django but it should now be obvious how easy it is to do more complex things with it.

Exception occurred processing WSGI script with Django

If you are running Django under mod_wsgi on apache you might see errors like "IOError: failed to write data" followed by "Exception occurred processing WSGI script" in your logs, this is because your code is actually throwing an exception but Django likes to hide any errors from your users (for obvious reasons).

Usually Django would send you an e-mail with the full callstack of the error to ADMINS defined in your settings.py but if you don't have your e-mail settings or ADMINS properties configured it will not be able to send you your e-mail report.

Also, if you have your e-mail host server defined wrong (i.e gmail.com instead of smtp.gmail.com) your server will just keep churning and never load your 404 or 500 server errors.

If you don't have an SMTP server to use and need to see the exception your site is throwing you could always set Debug=True but then anyone who accesses your site in that time will also see the information of the exception.

To use g-mail with django click here

For Django documentation on email settings click here

Tips and Tricks for the Python Interpreter

I have seen a lot of people switch over to using ipython, bpython, etc to get auto-complete support without realizing that the standard interpreter does have this functionality.

To enable auto-complete support in the python interpreter you need to create a python startup file that enables readline support. A python startup file is just a bunch of python code that gets executed at startup of the interpreter. To do this you just setup PYTHONSTARTUP in your ~/.bashrc and then create a ~/.pythonrc.py file:

#.bashrc
PYTHONSTARTUP=~/.pythonrc.py
export PYTHONSTARTUP

#.pythonrc.py
try:
    import readline
except ImportError:
    print("Module readline not available.")
else:
    import rlcompleter
    readline.parse_and_bind("tab: complete")

Now when you are in python you have tab completion on importing, calling methods on a module, etc.

>>> import o
object(  oct(     open(    or       ord(     os

I always end up using the pretty print module for viewing long lists and strings in the interpreter so I prefer to just use it by default:

# Enable Pretty Printing for stdout
import pprint
def my_displayhook(value):
    if value is not None:
        try:
            import __builtin__
            __builtin__._ = value
        except ImportError:
            __builtins__._ = value

        pprint.pprint(value)

sys.displayhook = my_displayhook

It is also very useful to be able to load up your favorite editor to edit lines of code from the interpreter, you can do this by adding the following into your ~/.pythonrc.py:

import os
import sys
from code import InteractiveConsole
from tempfile import mkstemp

EDITOR = os.environ.get('EDITOR', 'vi')
EDIT_CMD = '\e'

class EditableBufferInteractiveConsole(InteractiveConsole):
    def __init__(self, *args, **kwargs):
        self.last_buffer = [] # This holds the last executed statement
        InteractiveConsole.__init__(self, *args, **kwargs)

    def runsource(self, source, *args):
        self.last_buffer = [ source.encode('latin-1') ]
        return InteractiveConsole.runsource(self, source, *args)

    def raw_input(self, *args):
        line = InteractiveConsole.raw_input(self, *args)
        if line == EDIT_CMD:
            fd, tmpfl = mkstemp('.py')
            os.write(fd, b'\n'.join(self.last_buffer))
            os.close(fd)
            os.system('%s %s' % (EDITOR, tmpfl))
            line = open(tmpfl).read()
            os.unlink(tmpfl)
            tmpfl = ''
            lines = line.split( '\n' )
            for i in range(len(lines) - 1): self.push( lines[i] )
            line = lines[-1]
        return line

c = EditableBufferInteractiveConsole(locals=locals())
c.interact(banner='')

# Exit the Python shell on exiting the InteractiveConsole
sys.exit()

For Django developers when you load up the ./manage.py shell it is nice to have access to all your models and settings for testing:

# If we're working with a Django project, set up the environment
if 'DJANGO_SETTINGS_MODULE' in os.environ:
    from django.db.models.loading import get_models
    from django.test.client import Client
    from django.test.utils import setup_test_environment, teardown_test_environment
    from django.conf import settings as S

    class DjangoModels(object):
        """Loop through all the models in INSTALLED_APPS and import them."""
        def __init__(self):
            for m in get_models():
                setattr(self, m.__name__, m)

    A = DjangoModels()
    C = Client()

After these tweaks the python interpreter is a lot more powerful and you really lose the need for the more interactive shells like ipython and bpython. All of these settings work in both python2 and python3.

If you want to see my complete ~/.pythonrc.py you can get it on github

Next Page