Building realtime applications with CycleJS and RxJS

cyclejsandrxjs.jpg

Today we’re continuing our Building Realtime Apps tutorial series by taking a look at how to build a realtime chat application using CycleJS, a functional, reactive framework that uses the Reactive JavaScript Extensions under the hood.

Introduction

Today we’re continuing our Building Realtime Apps tutorial series by taking a look at how to build a realtime chat application using CycleJS, a functional, reactive framework that uses the Reactive JavaScript Extensions under the hood. We’ll see how to model an application using Cycle and get familiar with the core concept of RxJS: data as streams that can be observed.

If you’re here for the source code you can find it all on GitHub. My thanks also to Oliver Ash who dedicated a lot of time towards assisting me when I was first getting started with Cycle.

This application is built using Webpack and ES2015; using Babel to transpile the JavaScript. Full instructions on how to run the application locally are provided in the README. You’ll also need a small server running for the application to POST data to, such as our Sinatra Server.

Screen Shot 2016-04-05 at 14.20.05

Observables

In CycleJS everything is built on top of Observables, and understanding these is crucial to working with Cycle effectively. The CycleJS docs on Observables are a great, thorough read, but include this quote which sums them up nicely:

Observables are lazy event streams which can emit zero or more events, and may or may not finish.

Nearly every bit of data we’ll use in a Cycle application is accessed as an observable. For example, if we wanted to listen out for the value of an input box changing, we could create an observable that represents them:

Observables are lazy event streams which can emit zero or more events, and may or may not finish.

1const changeMessage$ = sources.DOM
2  .select('.input-message')
3  .events('input')
4  .map(e => e.target.value)

Here we access the DOM element with the class .input-message (sources.DOM is a way of accessing the DOM, provided by CycleJS), listen out for any input events, and map each event to the value of the input box. This returns an observable, commonly referred to as a stream, and it’s denoted as a stream by a $ at the end of the variable name.

Now, any part of our application that cares about the user changing the input box can subscribe to the changeMessage$ stream, and be notified when the value is changed.

CycleJS embraces the above pattern and provides us with a little more structure using the Model-View-Intent pattern:

MVI is a simple pattern to refactor the main() function into three parts: Intent (to listen to the user), Model (to process information), and View (to output back to the user).

CycleJS also provides us with drivers, which listen to observable inputs and perform side effects, such as DOM rendering (which CycleJS does via a Virtual DOM implementation) and HTTP requests, which Cycle has a built in driver for.

To kick off and start a CycleJS application we provide it with an object of drivers and a main function that is expected to return an observable for each of the drivers we’re using. For example, if we have these drivers:

1const drivers = {
2  DOM: makeDOMDriver('#app'),
3  HTTP: makeHTTPDriver()
4};

That’s an object with two drivers, DOM and HTTP, so we need to give Cycle a function that returns an object with two observables, one for each driver. In Cycle terms, these are called sinks. A driver is a function that listens to an observable sink of data. In our case, the function might look like:

1function main(sources) {
2  // stuff here...
3  const myDomObservable$ = ...;
4  const myHttpObservable$ = ...;
5  return {
6    DOM: myDomObservable$,
7    HTTP: myHttpObservable$
8  }
9}

Cycle then takes care of linking the drivers to the observables and running your application:

1Cycle.run(main, drivers);

This will initialise your application and get everything up and running.

Pusher and RxJS

We know that when building an application using CycleJS we need to work with a ReactiveJS stream of data, but often not everything you’re working with will provide it. For example, in this case we need to use Pusher, but the Pusher library doesn’t provide a stream for our messages. When this happens we need to write a small wrapper that will take our Pusher data and produce a stream of data from it.

ReactiveJS lets us create a Subject, which is a way for us to create our own observables. We can create a new subject and pass data to it. Any subscribers to it will then have that data emitted.

Below I create a module that exports a function pusherObservable. It takes a channel name and an event name, returning a stream of all data sent through that channel for the given event name:

1import Pusher from 'pusher-js';
2import { Subject } from 'rx';
3
4const pusher = new Pusher('YOUR_PUSHER_API_KEY_HERE', {});
5
6function pusherObservable(channelName, eventName) {
7  const pusherMessages$ = new Subject();
8  const channel = pusher.subscribe(channelName);
9  channel.bind(eventName, (data) => {
10    pusherMessages$.onNext(data);
11  });
12
13  return pusherMessages$.startWith(null);
14}
15
16export { pusherObservable };

We can use it like so to get a stream of Pusher events from a paritcular channel:

1const pusherMessages$ = pusherObservable('messages', 'new_message');

Building our chat application

Now we’re a little more familiar with the principles of Cycle we can start to focus more on our chat application. We’ll model our application into the Model-View-Intent pattern, first defining our intents, inside the main function:

1function main(sources) {
2  const intent = function(DOM) {
3    const changeMessage$ = sources.DOM
4      .select('.input-message')
5      .events('input')
6      .map(e => e.target.value)
7      .share();
8
9    return {
10      changeMessage$: changeMessage$,
11
12      submitMessage$: changeMessage$
13        .sample(sources.DOM.select('.messages-form').events('submit'))
14        .share()
15    }
16  }
17}

You can think of intents as things the user will want to do by interacting with your page; typically these are button clicks, form filling, and so on. We create our intents by calling the intent function with the DOM source, which is a source of all events that occur in the DOM. Cycle passes this in to the main function as sources.DOM and we can create an object that represents all user actions by calling the intent function:

1const actions = intent(sources.DOM);

Now actions is an object of streams, each representing a user action. In our case we have two streams:

  • changeMessage$ which tracks every time the user changes the value of the message input box
  • submitMessage$ which tracks every time the user submits the form to send a message

The Model

Next we need to define the model, which is a single stream of data representing the state of your application. To create the model we have a function that takes in the user actions and returns an observable:

1const model = function(actions) {
2  const message$ = Observable.merge(
3    actions.changeMessage$,
4    actions.submitMessage$.map(x => '')
5  ).startWith('')
6
7  return Observable.combineLatest(
8    allPusherMessages$,
9    message$,
10    (pusherMessages, message) => ({ pusherMessages, message })
11  );
12};

We first need to access the latest message from the user, so we can keep the input that will display the message up to date with what the user has typed. To do this we want to merge two streams: the observable that tracks changes to the message with the observable that tracks the form being submitted. merge takes many streams and returns one single stream. The reason we need to listen to the submitMessage$ is so when the user hits the ‘Submit’ button we can clear the form, setting the message back to an empty string.

You’ll also see that we subscribe to allPusherMessages$ within the call to combineLatest. Observable.combineLatest takes many streams and a function that will be called with the latest value from each stream. Its main use is when you need to run some code that needs the latest value of some data from more than one stream. In our case, to compute the current model for our application we need both the messages sent through Pusher and the value of the message input, so with combineLatest we create a new stream that returns an object with the latest value of those streams.

You can find the implementation of allPusherMessages$ on GitHub, but for now just know that it’s a stream that contains all the messages that have been received by the application. It doesn’t just emit the latest message, but instead an array of all messages. We need this because when we want to render our data to the browser we want to show all messages, not just the very latest one.

Now we have a function that will return a stream of our latest model, we can create an observable representing the latest state of our application. To do this we call the model function, passing in the actions object that we created earlier:

1const state$ = model(actions);

Now we have the state of our application at any given point, we’re ready to render to the DOM.

The View

To render we create a view function that will be given the state$ stream and be expected to return a virtual DOM representing the application for the given state. CycleJS uses the virtual-dom library and provides helpers for creating these elements. Let’s define our view function, which will map the state$ observable to create an observable of DOM elements. I split the function that returns a virtual DOM into viewMessages, purely to keep the code a little cleaner.

Note that to save space I’ve stripped some of the HTML that’s in the real code on GitHub. Additionally, each Pusher message in the pusherMessages array has three properties: text, username and time.

1function view(state$) {
2  return state$.map(({ pusherMessages, message }) => {
3    return viewMessages(pusherMessages, message);
4  });
5}
6
7function viewMessages(pusherMessages, message) {
8  return div([
9    div({ id: 'message-list' }, pusherMessages.map(renderPusherMessage)),
10    div([
11      form({ className: 'messages-form', onsubmit: (e) => e.preventDefault() }, [
12        input({ className: 'input-message', value: message  }),
13        button({ className: 'send-message' }, [
14          span({ className: 'white light fa fa-paper-plane-o' })
15        ])
16      ])
17    ])
18  ]);
19}
20
21function renderPusherMessage({ text, username, time }) {
22  return div({ className: 'message' }, [
23    div({ className: 'text-display' }, [
24      div({ className: 'message-data' }, [
25        span({ className: 'author' }, username),
26        span({ className: 'timestamp' }, new Date(time)),
27      ]),
28      p({ className: 'message-body' }, text)
29    ])
30  ])
31}

It might seem a bit weird and foreign to write your HTTP using JavaScript calls, but after you’ve used CycleJS it will start to feel natural. Being able to easily split it up into different functions, as I’ve done above, is really nice and makes it easier to work with larger HTML blocks, compared to more traditional templating.

HTTP Requests

We’re now so close to being done. The final piece of the puzzle is creating the HTTP requests to POST the user’s message to our server when they hit the submit button. To do this we need to subscribe to the submitMessage$ stream and make an HTTP request every time there’s a new message. We can do this and create a new stream of objects representing HTTP requests, which we’ll store in a stream called request$.

Here I’ve hard coded the username to pusher to keep the code shorter, but in the code on GitHub you’ll see that we’ve got a little bit extra to collect the user’s Twitter name before they start sending messages.

1const request$ = actions.submitMessage$.filter(
2  (message) => message !== ''
3).map((message) => {
4  return {
5    method: 'POST',
6    url: 'http://localhost:4567/messages',
7    headers: {
8      'Content-Type': 'application/json'
9    },
10    send: {
11      time: new Date(),
12      text: message,
13      username: 'pusher'
14    }
15  }
16});

We first filter the message stream to make sure we’re not sending blank messages, and then we map that stream to be a stream of objects that tell CycleJS what HTTP request it should make. In a moment we’ll hand this stream off to the CycleJS HTTP driver, which is responsible for taking a stream and making the actual HTTP requests.

Putting it all together

Having written all this code inside the main function we’re now ready to pass our application’s streams back to CycleJS so it can work its magic and get our application running. As explained earlier, the main function should return an object where each key is named after a CycleJS driver, and the value is the input (or sink, in CycleJS terms) that the driver should listen to. All we need to do is return an object with our DOM and HTTP sinks:

1function main(sources) {
2  // code omitted
3
4  return {
5    DOM: view(state$),
6    HTTP: request$
7  }
8}

The observable for the DOM driver should be the result of calling the view function, passing in the state$ observable. The view function returns a stream of virtual DOM elements, which is exactly what the DOM driver expects to be given. We give the HTTP driver our request$ stream, which emits objects representing HTTP requests that should be made.

Finally we can create our drivers and call Cycle.run to get things started:

1const drivers = {
2  DOM: makeDOMDriver('#app'),
3  // we don't listen to the response
4  // so we need to tell it to make HTTP requests anyway
5  HTTP: makeHTTPDriver({ eager: true })
6};
7
8Cycle.run(main, drivers);

Note the { eager: true } argument to makeHTTPDriver; by default HTTP requests won’t be made unless something is subscribed to the stream – CycleJS avoids making requests if it doesn’t think anything cares about the result. In our case though we’re not listening out for the result but we still want the requests to be made, hence the extra argument to tell Cycle that.

Conclusion

I hope this article has given you a sense of how we can build applications using Cycle and Observables. Representing data as streams can take some getting used to but it’s a great way to work once you’re adjusted. The structure of Cycle’s drivers to abstract side effects and the way Cycle uses the virtual DOM to re-render frees you from worrying about when HTTP requests will be made, or needing to update the DOM, and I definitely encourage you to give it a try.

You can find all the code on GitHub and if you’d like to dive into Cycle further I can highly recommend the CycleJS Fundamentals video series on Egghead which does a great job of walking you through the framework.