streams

Streams

Streams is Appbase’s streaming architecture based on Elasticsearch. It comprises features such as streaming of documents and queries to TTL and timed notifications. All features aim to be compatible with Elasticsearch’s API. Streams also offers HTTP basic authentication and SSL. It is compatible with Elasticsearch 2.X and 5.X.

Streams is highly performant; it’s implemented on top of Nginx using LuaJIT, a very fast implementation of the Lua language. We leverage OpenResty, a bundle of Nginx, LuaJIT, and many other high-quality modules used by many successful projects!

Streams is Docker-based and pretty easy to deploy — anywhere, in front of arbitrary Elasticsearch clusters. Read on!

Table of Contents

Overview

Streams is a transparent streaming layer built on top of Elasticsearch. It currently supports the following features:

  1. Streaming of
    • Updates or deletion of a given document
    • New documents matching a query
  2. TTL: time to live at index level
  3. A batched streaming mode: Streams delivers new documents matching the query you passed, just like in 2., but for a fixed number of times and in the interval you want
  4. SSL and HTTP basic authentication

Check out our Sample Use Cases section.

Installation

Some guidelines on how to run Streams.

Elasticsearch

Streams can be deployed in front of an entire cluster or any subset of nodes, including a single node; each scenario has its pros and cons. For this tutorial, let’s run an Elasticsearch node locally, using Docker.

Let’s create a Docker network which will house Elasticsearch and Streams: docker network create streams

Let’s now run Elasticsearch: docker run -d --name=es -p 9200:9200 --net=streams elasticsearch:5

Streams

The simplest possible way to run Streams, although not recommended, is with SSL and HTTP basic auth disabled:

docker run -d --name=streams -p 80:80 --net=streams \
  -e ES_NODES=es:9200 -e SSL_OFF=true -e AUTH_OFF=true \
  -e DNS_SERVER=127.0.0.11 store/appbaseio/streams:2.1.2

However, in real-world, it’s a good idea to have both enabled; for that, do:

docker run -d --name streams -p 443:443 \
  -e ES_NODES=es:9200 -e DNS_SERVER=127.0.0.11 \
  -e AUTH_USER=user -e AUTH_PASS=pass \
  -v someDir:/ssl store/appbaseio/streams:2.1.2

Here, someDir is some directory where your SSL certificate and key are located (See SSL configuration). After this, you’ll be able to access Streams on https://localhost, with basic authentication user user and password pass. Note that we refer to our Elasticsearch node through the container name and use Docker’s built-in DNS resolver, available at 127.0.0.11.

All options supported by Streams are passed in as environment variables; a summary of them:

ES_NODES: Comma-delimited list of Elasticsearch nodes, as in
172.18.0.3:9200,172.18.0.4:9200
SSL_OFF: Disable SSL usage; it enables HTTP (NOT recommended for
production)
AUTH_OFF: disable HTTP basic authentatication
AUTH_USER: username for HTTP basic auth
AUTH_PASS: password for HTTP basic auth
DNS_SERVER: address for your name server
DOMAIN_NAME: Domain name for Streams

SSL configuration

Streams allows for generic SSL certificate configuration; the directory /ssl in the container is expected to contain two files: CERTIFICATE.crt and PRIVATE_KEY.key. The only requirement is that the certificate name ends with crt and the key with key; Streams will detect the files using this rule and set them up in Nginx.

-v some_dir_with_pair:/ssl

some_dir_with_pair is some directory containing the certificate-key pair.

Architecture & Deployment Scenarios

Below is a block diagram of Streams architecture.

                     .---------------------.
                     |       Streams       |
                     |                     |
                     |    .-----------.    |
                     |    |           |    |
       Req 1         |    |           |    |
     <------------------> |  streams  |    |
       Req 2 ?stream |    |  port 80  |    |
     -------------------> |   / 443   |    |
                     |    |           |    |
                     |    .-----------.    |     .-----------------.
                     |          .          |     |  Elasticsearch  |
                     |          |       <------> |    upstream     |
                     |          .          |     .-----------------.
                     |    .-----------.    |
                     |    |           |    |
       Req 2         |    |  pub/sub  |    |
     <------------------- | localhost |    |
                     |    | port 5678 |    |
                     |    |           |    |
                     |    .-----------.    |
                     |                     |
                     |                     |
                     .---------------------.

Here, Req 1 is a normal, no-streaming, request; it’s proxied to Elasticsearch. Req 2, on the other hand, opens a streaming channel, keeping the request open for matching documents; it’s proxied to a virtual server.

As a proxy to Elasticsearch, Streams can be deployed in front of each cluster node or any subset of them.

Sample use cases

To test Streams locally, use run_dev.sh to provision a local environment for tests. This will run a container for Streams itself, and one for Elasticsearch. Running run_dev.sh with no arguments will create an Elasticsearch v2 container; running run_dev.sh -2 will have the same effect, and run_dev.sh -5 will run an Elasticsearch v5 container.

Running Streams

# Start Elasticsearch
docker run -d --name=es -p 9200:9200 elasticsearch:2.4

# Start Streams; assume our Elasticsearch node is at 172.17.0.2:9200
docker run -d --name=streams -p 80:80 -e ES_NODES=172.17.0.2:9200 -e SSL_OFF=true -e AUTH_OFF=true appbaseio/streams-preview

# Set the IP address; 'localhost' here
streams=localhost

Basic features & Streaming

# Create a document and store
curl -XPUT "$streams/blog/post/1" -d '{"content":"Hello, world!"}'

# Get the newly created document
curl -XGET "$streams/blog/post/1"

# Delete the document
curl -XDELETE "$streams/blog/post/1"

# Create the document again, this time with POST
curl -XPOST "$streams/blog/post" -d '{"content":"Hello, world!"}'

# Register a query and listen
curl -XGET "$streams/blog/_search?stream=true" -d '{"query":{"match":{"content":"appbase"}}}'

# Post something matching that query
curl -XPOST "$streams/blog/post" -d '{"content":"appbase for the realtime web"}'

# Post something matching that query - don't store this time
curl -XPOST "$streams/blog/post?store=false" -d '{"content":"this is appbase Streams"}'

# Query all blog posts and see that previous post hasn't been stored
curl -XPOST "$streams/blog/post/_search?pretty" -d '{"query":{"match_all":{}}}'

# Post something else and see that it is not streamed
curl -XPOST "$streams/blog/post" -d '{"content":"another post"}'

# Now query all blog posts again to see which ones were stored
curl -XGET "$streams/blog/post/_search" -d '{"query":{"match_all":{}}}'

# Let's insert a new document
curl -XPOST "$streams/blog/post/123" -d '{"content":"a blog post"}'

# Now let's listen to changes in that document
curl -XGET "$streams/blog/post/123?stream=true"

# Now let's update that document and watch the streamed update
curl -XPOST "$streams/blog/post/123/_update" -d '{"doc":{"content":"a new blog post"}}'

# Now let's delete that document and watch the deletion report
curl -XDELETE "$streams/blog/post/123"

# Now get info about active streams (subscribers) and messages state (requires exposing port 5678 during docker run)
curl -XGET "$streams:5678/_streams/debug"

Time to Live (TTL)

TTL enables defining a Time to Live for indices; it works on both PUT and POST requests at index creation time. It’s enabled with a URL query argument named ttl, whose values obey the format Xu, where X is an integer and u is one of s, m, h or d, for seconds, minutes, hours or days.

# Issue a GET request on a non-existent index and check the 404 status
curl -XGET "$streams/someindex"

# Create the index with a 10-second TTL
curl -XPUT "$streams/someindex?ttl=10s"

# Check quickly that the index still exists
curl -XGET "$streams/someindex"

# After 10 seconds, a 404 is returned
curl -XGET "$streams/someindex"

Time Queries

Time queries work in a similar fashion to streaming queries, but they allow setting up two variables: interval and count. Interval defines when new documents should be streamed and count defines how many times the cycle is to be repeated. As with streaming queries, it’s mandatory that the query refers only to fields that are present in the type mapping.

# Index a document to make sure the mapping exists
curl -XPOST "$streams/blog/post" -d '{"content":"Hello, world!"}'

# Create a new time query which should deliver results 50 times, every 5 seconds; these results are documents containing "appbase" in the field "content"
curl -XPOST "$streams/blog/post/_timequery?interval=5s&count=50" -d '{"query":{"match":{"content":"appbase"}}}'

# The listening endpoint looks similar, except that the request is a GET and only the query is specified
curl -XGET "$streams/blog/post/_timequery" -d '{"query":{"match":{"content":"appbase"}}}'

# Now, any new document matching the query will be streamed to listeners every 5 seconds, 50 times
curl -XPOST "$streams/blog/post" -d '{"content":"hello appbase"}'

© 2017 Appbase, Inc