Recently I came across a problem where a number of applications needed to interact with a third-party web services-based system. The system provided a synchronous SOAP interface to what might be very long-running operations (such as activating N devices across a potentially very large network). I wanted to see if I could find a simple technique that met a few objectives:
- Turn a synchronous interface into an asynchronous one
- Store "messages" so that their state could be queried at any time
- Use REST where possible to make it easy for different types of applications to use it
CouchDB and EventMachine provide a mechanism that meets all of these in a satisfyingly simple way - couch-gateway! The gateway is a handover point where an application can submit a REST message and immediately return. The message is stored and forwarded to a blocking "slow" system for processing and at some point in the future the results written back to the message store.
CouchDB provides the REST API for submitting, storing and querying the status of messages. EventMachine provides the long-running process that listens for message events using Couch's Change Notification API. It despatches messages to the target system without blocking so that long responses don't prevent other messages from being handled.
Below are some of the implementation details that act as a sort of tutorial, or just remind me of what I've done!
How it works
The 'message' structure is simply a text string with some meta information. A test client using CouchRest can insert a new message document:
Couch saves the message as a JSON document (CouchRest does a nice job of assigning the ID from Couch's list of UUIDs if one isn't provided):
The client application can now get the status of their message (which will be empty by default):
Already we've provided two of the objectives (without writing any code!). Client applications can submit messages using REST, and request the message status is persisted and queriable.
The other component of the gateway uses EventMachine to start a long-running listener process. It listens for changes to the messages database and reacts when notified:
The EventMachine loops runs until we shut down the listener process. We open a continuous HTTP connection to Couch's changes interface. The changes API supports a few polling approaches, but I've chosen the 'continuous' mode, which sends a line for notification as it occurs. The em-http-request gem provides http streaming response processing, so we can provide a callback whenever we receive a change notification. The 'since' option lets us specify how many updates to process (useful when the listener restarts, to avoid receiving notifications for messages already processed). The include_docs option avoids the need to re-fetch the message to retrieve its text.
Finally, the filter lets the listener specify which messages it should be notified of. Like Couch map views, Couch filters are Javascript functions, but where views emit document fields, filters determine if a document should be returned at all:
Using the filter, we avoid repeat notifications when we update the message's status after successful dispatching by the gateway (filters and views in CouchDB are written in Javascript; they can be kept in a text file and uploaded to Couch when changed, which makes them easy to manage in source control and upload using Rake).
After we receive a notification, we can handle it by despatching it to the target system:
Here we use EventMachine to make a non-blocking HTTP request to the slow target server, and Ruby Fibers to resume the processing when the callback occurs, telling us that the request has completed. Once the Fiber is resumed, we can update the status of the message in Couch with the results of the slow operation (this technique is expertly described by Ilya Grigori).
couch-gateway has thus provided the last of our objectives: turn a synchronous (potentially slow) interface into an asynchronous one!
The last piece of the gateway puzzle done to date is to catch up with missed messages that may have been posted while the listener is down. Couch views let us query for messages based on their status:
This map function will return all documents that don't have a status (the gateway application is responsible for managing the status of a message). We can now replay these messages once the listener is running: by simply updating their status, each will become a new notification for the listener to handle.
There are lots of improvements that can be made to the code. I've listed a few key ones I can think of so far:
- subscribing to message updates: as it stands, a client application would need to poll for status updates. It would be nice to provide a pub-sub approach (over REST?) to support this.
- plug-able despatchers: allowing different messages to be despatched to different systems
- security
Any suggestions or feedback welcome!
Useful references