Sunday, 4 August 2013

MongoDB in 3 Easy Steps!

If  you're entirely new to MongoDB - this is just a very gentle introduction that gets you up and running with a sample database in just a few easy steps.

It's really easy to get started. The steps are:

1. Install MongoDB
2. Create a Database
3. Query the Database

1. Install MongoDB

  • The mongoDB site has simple, clear instructions for downloading and installing. Just click on the link below and follow the instructions for your particular operating system (much clearer that trying to explain them again here!):
      http://docs.mongodb.org/manual/installation/
  • (Make sure you have the have started the 'mongod' server process as explained in these instructions - you should have a mongod server up and running before moving on to Step 2 - like in this screenshot below)

2. Create A Database

  • Now we have a server up and running - the next step is to create a database and pop some data in. For our example we have a very simple Library database, with a single 'book' table (or 'collection' in mongo-speak).
  • Instead of running SQL against mongo - you use Javascript - an example build file is included below (you could enter the commands in the interactive console window - shown later - but for speed and ease of use - you can also run them all together from a script file - which is the approach we use here)
  • Copy the javascript code from the GIST below, and save it to a file called 'library_build_script.js' build script on your machine


  • Open a new console window, navigate to the 'bin' folder in the extracted mongo installation files,  and use the mongo command to load the script - just type:
mongo SCRIPT_LOCATION/library_build_script.js'

  • (Note: if you set the PATH up on your machine, you can of course run the mongo executables from anywhere)
  • this will run the build script to create the database, collection and indexes, and prepopulate with some test data.
  • Note - we are using getSiblingDB() to create and work with a sibling database. This is slightly more complex than the default, but if you create multiple databases, this helps organise them better (as you build databases in a tree like structure). It keeps it easier to manage/view multiple databases on the same server. Have a play around with different options when you get up and running to see what works best for you.

 

3. Query the Data

  • It's as simple as that to have a populated database up and running on your machine - we can now use the interactive mongo command line tool to query the data
  • Start up the interactive mongo console. Still in the 'bin' folder, type
mongo

  • Find All Books Query - list all books in the database
db.getSiblingDB("library").book.find()


  • Find All Books With Type 'Fantasy' Only
db.getSiblingDB("library").book.find({'type':'Fantasy'})
   


As you can see - it's really easy to get up and running in just a few minutes. Of course there's much more to cover - but this should be a good starting point to start playing.

I'll hopefully be expanding on the Library database in some future posts - exploring more advanced features and functions.

 

Addendum: Using A GUI Client

You can also use a GUI client to explore your local MongoDB database if you prefer that to using the command line.

Check out UMongo - you can download it for free from the website here:

http://edgytech.com/umongo/

Just follow the installation instructions for your OS, start it up and create a connection to your local mongod server.



You can then interact with your local mongo instance using the tools in UMongo




Thursday, 1 August 2013

JPA Searching Using Lucene - A Working Example with Spring and DBUnit

Working Example on Github

 


There's a small, self contained mavenised example project over on Github to accompany this post - check it out here: https://github.com/corsoft/jpa-lucene-spring-demo


Running the Demo


See the README file over on GitHub for details of running the demo. Essentially - it's just running the Unit Tests, with the usual maven build and test results output to the console - example below. This is the result of running the DBUnit test, which inserts Book data into the HSQL database using JPA, and then uses Lucene to query the data, testing that the expected Books are returned (i.e. only those int he SCI-FI category, containing the word 'Space', and ensuring that any with 'Space' in the title appear before those with 'Space' only in the description.



The Book Entity


Our simple example stores Books. The Book entity class below is a standard JPA Entity with a few additional annotations to identify it to Lucene:

@Indexed - this identifies that the class will be added to the Lucene index. You can define a specific index by adding the 'index' attribute to the annotation. We're just choosing the simplest, minimal configuration for this example.

In addition to this - you also need to specify which properties on the entity are to be indexed, and how they are to be indexed. For our example we are again going for the default option by just adding an @Field annotation with no extra parameters. We are adding one other annotation to the 'title' field - @Boost - this is just telling Lucene to give more weight to search term matches that appear in this field (than the same term appearing in the description field).

This example is purposefully kept minimal in terms of the ins-and-outs of Lucene (I may cover that in a later post) - we're really just concentrating on the integration with JPA and Spring for now.

The Book Manager


The BookManager class acts as a simple service layer for the Book operations - used for adding books and searching books. As you can see, the JPA database resources are autowired in by Spring from the application-context.xml. We are just using an in-memory hsql database in this example.

application-context.xml


This is the Spring configuration file. You can see in the JPA Entity Manager configuration the key for 'hibernate.search.default.indexBase' is added to the jpaPropertyMap to tell Lucene where to create the index. We have also externalised the database login credentials to a properties file (as you may wish to change these for different environments), for example by updating the propertyConfigurer to look for and use a different external properties if it finds one on the file system).


Testing Using DBUnit


In the project is an example of using DBUnit with Spring to test adding and searching against the database using DBUnit to populate the database with test data, exercise the Book Manager search operations and then clean the database down. This is a great way to test database functionality and can be easily integrated into maven and continuous build environments.

Because DBUnit bypasses the standard JPA insertion calls - the data does not get automatically added to the Lucene index. We have a method exposed on the service interface to update the Full Text index 'updateFullTextIndex()' - calling this causes Lucene to update the index with the current data in the database. This can be useful when you are adding search to pre-populated databases to index the  existing content.

The source data for the test is defined in an xml file

Wednesday, 31 July 2013

502 Proxy Error Using CometD, Apache and Camel

We recently hit an issue during testing with Apache returning "502 Proxy Errors" to clients who were connecting via CometD (using Apache as a proxy server in front of a Camel/Jetty CometD server).

<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html><head>
<title>502 Proxy Error</title>
</head><body>
<h1>Proxy Error</h1>
<p>The proxy server received an invalid
response from an upstream server.<br />
The proxy server could not handle the request <em><a href="/cometdServer/connect">POST&nbsp;/cometdServer/connect</a></em>.<p>
Reason: <strong>Error reading from remote server</strong></p></p>
</body></html>


Our Setup

We are using a combination of CometD, Camel (in ActiveMQ) and Apache to broadcast messages from our server to subscribed browser clients.
  • CometD is a scalable HTTP-based event routing bus that uses an Ajax Push technology pattern known as Comet.
  • Comet is a web application model using long-held HTTP requests which allow the web server to ‘PUSH’ data to a browser, without the browser specifically requesting it.
  • We have Camel sitting on an ActiveMQ server. On this we have routes set up to process incoming JMS messages. These messages get processed, and then the output from this process (in the form of a JSON message) is sent to a CometD endpoint defined in the Camel route, for example:
cometd://0.0.0.0:9099/mybroadcastchannel 


  • This endpoint uses the Camel CometD module to run a Jetty instance on port 9099, which handles the CometD endpoint.
  • The browser clients connect to this endpoint via an Apache server. We do this to make use of Apaches mod_proxy module to act as a proxy to the CometD endpoint, e.g. in proxy.conf:
ProxyPass /cometdServer http://camelserver:9099/cometd

ProxyPassReverse /cometdServer http://camelserver:9099/cometd


  • This allows us to easily ‘PUSH’ messages out to subscribed browser clients to inform them of the results of processing taking place inside our Camel messaging routes (the kind of thing you would typically implement using JMS Topics in a java desktop/swing type solution)

Known Problems with Apache and CometD Long Polling

As a general rule, using Apache as a proxy in front of a CometD long-polling/Bayeux server is not a good idea.  This is due to Apaches ‘thread-per-request’ model – which introduces problems when scaling this solution.

This is described in more detail here:
http://cometd.org/node/81

The problem is caused by the Apache proxy server timing out before the Jetty\Camel server (using the default configuration). By default, Apache Timeouts are set to 2 minutes (as defined globally in httpd.conf).
Over on the Camel\CometD side, the default timeout is set to 4 minutes (240000 milliseconds)
http://camel.apache.org/cometd.html

Under certain conditions – if the CometD endpoint holds the connection for longer than 2 minutes, it causes a timeout on the Apache proxy, which then disconnects the CometD connection on the client (the length it holds the connection open seems to vary, but is typically much shorter than this)

Solution 1

Increase the timeout on the Apache Proxy server to be greater than 4 minutes. You can either do this globally, or set it specifically on each ProxyPass configuration (in proxy.conf)

ProxyPass /cometdServer http://camelserver:9099/cometd timeout=250

ProxyPassReverse /cometdServer http://camelserver:9099/cometd timeout=250


Solution 2

Decrease the timeout on the Camel server to be less than 2 minutes, e.g.

cometd://0.0.0.0:9099/mybroadcastchannel?timeout=110000


Useful Tools for Debugging

I found these tools invaluable in getting to the bottom of what was happening (and digging into the details of the CometD messages)

Chrome Developer Tools

Useful for monitoring the network traffic between the browser and the server – lets you easily see the details of each HTTP interaction (on the ‘Network’ tab). Of particular use here was seeing the Timings – you could see that the request which failed did so at a 2 minutes point

Apache Server-Status Module

Enabling apache mod_status was useful to confirm that the problem was not thread related (as mentioned in the known issues with Apache/CometD above)
This lets you view what the current worker threads are doing on Apache. Instructions on how to enable can be found here:

http://httpd.apache.org/docs/2.2/mod/mod_status.html




Friday, 24 May 2013

Analysing REST Web Services with Squid and AWStats

Background

I've got a Java application running on Tomcat on Amazon EC2, which exposes RESTful web services delivering JSON to a variety of mobile apps and web sites.

I'm running a Squid server as a reverse proxy caching in front of the web services, to cache defined content at the HTTP layer.

(I'm also caching in the application layer using Ehcache and Spring - but adding the Squid cache in front of this lets me cache more content that just the JSON, and lets me take advantage of Squids proxy functions to manage back end services easily. It also takes more load off the Tomcat server - which is also running scheduled tasks to update the data in the system.)

This lets me run a pretty mixed environment, which suits my current purposes for delivering speedy content, removes load on my main app server and also doesn't tie me to closely to any particular technical implementation in the back end. There is a complexity overhead, but part of the reason for doing this is to play with the technologies - so thats all part of the fun!

At the moment, all this is running on a single EC2 Ubuntu server instance.

All HTTP traffic comes into the Squid server, running on Port 80. This caches certain html, image, json, php, etc content (as defined in /etc/squid3/squid.conf) - and if it can't find the request in the cache (or has been told not to cache it), will hand off to one of the upstream servers to service the request - Tomcat (on port 8080 - serving my Java web services, and admin App) or Apache (on port 9898 - serving my web sites, PHP/HTML).

So my basic setup is similar to this diagram:




The Problem - How to Analyse Web Service Traffic?

What I want to do is be able to track visitors across all web sites and mobile apps. I can track website visitors fine using Google Analytics. However, the mobile apps access the web services directly - so a client based tracker like Analytics cannot track these requests..

As all requests are routed through Squid - all access data sits in the squid 'access.log'. This is the only place that knows about all the traffic (as this will log all access, whether delivered from the cache, or from the backend servers)

Although Squid provides the cachemgr tool - this is more geared to monitoring cache access, than actual detail of what is delivered, where and to who (which is the kind of info I'm more interested in)

 

My Solution

Looking at the various log analysis tools available - I decided to give AWStats a try - it's free, seems well documented and commonly used

My goal was to get this AWStats set up on my Ubuntu box to read the Squid logs and provide some nice HTML output - updated on a regular basis so I could monitor usage through the day

So my plan was to end up with something like this:


Prerequisites

  • Before I started - I already had these set up and running on my Ubuntu (version 12) box
    • Squid Server (running on port 80)
    • Apache (running on port 9898)

 

Step 1 : Install AWStats

The first step was just to install AWStats on Ubuntu using these apt commands (note: the last 2 are only needed if you want to track stats by country):

sudo aptitude install awstats 
sudo aptitude install libnet-ip-perl 
sudo aptitude install libgeo-ipfree-perl

Step 2 : Configure Squid Logging

The next step is to change the format of the Squid logs. By default, Squids access.log is principally designed to only log the kind of info useful for logging caching activity (what was requested, when, was it served from the cache..)

An example of this is below
1369117923.612     26 209.20.75.224 TCP_MISS/200 4204 GET http://.../rest/feeds/v3/feeditem/9932/ - FIRST_UP_PARENT/tomcat application/json

1369117947.802      0 209.20.75.224 TCP_MEM_HIT/200 12130 GET http://.../rest/feeds/v3/latest/WH/2000-01-01-00-00? - NONE/- application/json

1369118022.040    139 209.20.75.224 TCP_MISS/200 12929 GET http://.../rest/feeds/v3/search/JAVA/JSF - FIRST_UP_PARENT/tomcat application/json

1369118027.240      0 209.20.75.224 TCP_MEM_HIT/200 12130 GET http://.../rest/feeds/v3/latest/WH/2000-01-01-00-00? - NONE/- application/json

1369118041.264      0 209.20.75.224 TCP_MEM_HIT/200 12130 GET http:/.../rest/feeds/v3/latest/WH/2000-01-01-00-00? - NONE/- application/json

1369118062.362      0 209.20.75.224 TCP_MEM_HIT/200 12130 GET http://.../rest/feeds/v3/latest/WH/2000-01-01-00-00? - NONE/- application/json

1369118062.811      0 96.28.139.57 TCP_MEM_HIT/200 9397 GET http://.../rest/image/38 - NONE/- image/png

1369118062.830      0 96.28.139.57 TCP_MEM_HIT/200 6936 GET http://.../rest/image/44 - NONE/- image/png


For AWStats to analyse it, and provide more information - we need to change the logging format to a more Apache style log. Something like this:
92.40.254.172 - - [23/May/2013:14:52:25 +0000] "GET http://.../rest/image/34 HTTP/1.1" 200 5043 "-" "Mozilla/5.0 (Linux; U; Android 2.3.6; en-gb; U8815 Build/HuaweiU8815C02B895) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1" TCP_MEM_HIT:NONE

86.150.72.140 - - [23/May/2013:14:53:26 +0000] "GET http://.../rest/feeds/v3/jsonp/feeditem/20112? HTTP/1.1" 200 5277 "-" "Mozilla/5.0 (Linux; U; Android 2.2; en-gb; HTC Desire Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1" TCP_MISS:FIRST_UP_PARENT

86.150.72.140 - - [23/May/2013:14:53:39 +0000] "GET http://.../rest/feeds/v3/jsonp/feeditem/20109? HTTP/1.1" 200 7114 "-" "Mozilla/5.0 (Linux; U; Android 2.2; en-gb; HTC Desire Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1" TCP_MISS:FIRST_UP_PARENT

To change the /var/log/squid3/acccess.log logging format - add the following lines to /etc/squid3/squid.conf
logformat combined %>a %ui %un [%{%d/%b/%Y:%H:%M:%S +0000}tl] "%rm %ru HTTP/%rv" %Hs %<st "%{Referer}>h" "%{User-Agent}>h" %Ss:%Sh

access_log /var/log/squid3/access.log combined

Stop Squid, backup up and remove the old /var/log/squid3/access.log and restart squid - now all logging from this point on will use this new file format - which AWStats will process
sudo stop squid3
sudo mv /var/log/squid3/access.log /var/log/squid3/access.log.backup
sudo start squid3

 

Step 3 : Configure AWStats

We need to tell AWStats which log file we want to use, and what the format is.

There are various ways you can configure your domain in AWStats. For simplicity here - I'm just assuming we have one domain and just use the 'out-the-box' awstats.conf configuration (I believe it is common practice to have different conf files for each domain - but we'll just use the default for now).

Find the following lines in '/etc/awstats/awstats.conf' - and change them to these settings (see more details of setting the LogFormat here):
LogFile="/var/log/squid3/access.log
LogFormat=1
SiteDomain=yourdomainname

 

Step 4 : Generating A Report

Now we have AWStats installed and configured, and Squid logging the correct format, we can generate an analysis report. Use the following command to do this:
sudo /usr/lib/cgi-bin/awstats.pl -config=awstats.conf –update

This should give you aome output similar to:
sudo /usr/lib/cgi-bin/awstats.pl -config=awstats.conf -update
Create/Update database for config "/etc/awstats/awstats.conf" by AWStats version 7.0 (build 1.971)
From data in log file "/var/log/squid3/access.log"...
Phase 1 : First bypass old records, searching new record...
Direct access after last parsed record (after line 412)
Jumped lines in file: 412
 Found 412 already parsed records.
Parsed lines in file: 53
 Found 25 dropped records,
 Found 0 comments,
 Found 0 blank records,
 Found 0 corrupted records,
 Found 0 old records,
 Found 28 new qualified records.

 

Step 5 : Configuring Apache/Squid To View Reports

Now we have AWStats installed, and Squid logging the correct format, the next step is to setup Apache and Squid to view the reports.

We want to view the reports on the URL: http://yourdomainname/statistics/awstats.pl

Because all our traffic goes through Squid - we just need to add some directives to '/etc/squid3/squid.conf' to redirect any urls going via '/statistics' directly to the Apache server. We also map '/awstats' urls to the apache server too - to catch the css and js files AWStats references.

# allow access to the awstats.pl - redirect to Apache
acl statisticsAcl url_regex -i (/statistics)
cache deny statisticsAcl
http_access allow statisticsAcl
cache_peer 127.0.0.1 parent 9898 0 no-query originserver name=statisticsPeer
cache_peer_access statisticsPeer allow statisticsAcl

# allow access to the awstats css, js, etc - redirect to Apache
acl awstatsAcl url_regex -i (/awstats)
cache deny awstatsAcl
http_access allow awstatsAcl
cache_peer 127.0.0.1 parent 9898 0 no-query originserver name=awstatsPeer
cache_peer_access awstatsPeer allow awstatsAcl

We then need to configure Apache to work with AWStats. We can easily do this by creating a new file '/etc/apache2/conf.d/statistics' (with the following content):
Alias /awstatsclasses "/usr/share/awstats/lib/"
Alias /awstats-icon/ "/usr/share/awstats/icon/"
Alias /awstatscss "/usr/share/doc/awstats/examples/css"
ScriptAlias /cgi-bin/ /usr/lib/cgi-bin/
ScriptAlias /statistics/ /usr/lib/cgi-bin/
Options ExecCGI -MultiViews +SymLinksIfOwnerMatch

Last step is just to restart Apache and Squid
sudo /etc/init.d/apache2 restart
sudo stop squid3
sudo start squid3

You should then be able to access your stats on 'http://yourdomainname/statistics/awstats.pl':



 

Step 6 : Scheduling as a Cron Job

If you want to schedule the stats to be updated (say, every 30 minutes) - just add this to '/etc/crontab'
*/30 * * * * root /usr/lib/cgi-bin/awstats.pl -config=awstats.config -update > /dev/null

Step 7 : Option - Generate Static Web Pages

So far - everything we do calls the awstats.pl perl script which re-generates the same web pages dynamically with the latest info.

If you want to - you can also generate static web pages. For example - I want to generate static pages - a different set each day (with the year, day and month in the HTML page title).

You can do this with the following command:

sudo /usr/share/awstats/tools/awstats_buildstaticpages.pl -awstatsprog=/usr/lib/cgi-bin/awstatsl.pl -config=awstats.conf -dir=/var/www/awstats-report -builddate=%YYYY-%MM-%DD

This will create a set of pages in '/var/www/awstats-report/' with the format 'awstats.awstats.conf.130524.html' (note: you will have to create the /var/www/awstats-report/ directory with the correct permissions.
You will have to add a new set of rules to '/etc/squid3/squid.conf' to:

# allow access to the awstats static reports folder - redirect to Apache
acl awstatsReportAcl url_regex -i (/awstats-report)
cache deny awstatsReportAcl
http_access allow awstatsReportAcl
cache_peer 127.0.0.1 parent 9898 0 no-query originserver name=awstatsReportPeer
cache_peer_access awstatsReportPeer allow awstatsReportAcl

Similarly - you can schedule this to run on a regular basis. For example, to run at 55 minutes past the hour, every hour - add this line to '/etc/crontab'
55 * * * * root /usr/share/awstats/tools/awstats_buildstaticpages.pl -awstatsprog=/usr/lib/cgi-bin/awstatsl.pl -config=awstats.conf -dir=/var/www/awstats-report -builddate=%YYYY-%MM-%DD

You can then access the static pages on the url: 'yourdomainname/awstats-report/awstats.awstats.conf.130523.html' 



Conclusion

Take all the above with a pinch of salt - I'm sure there are better ways to do some of it, it was learning experience for me - and at the end, delivered what I was after.

I think Squid is a great product, especially for a developer like myself who wants to play with different tech and swap it in/and out of my environment. It's proxying abilities allow you to swap implementatations and server locations with relative ease.

And in it's main role - as a reverse proxy web acceleration server - it's been performing excellently, speeding up web service and image serving with never a grumble. Now I have AWStats up and running - I can drill down and get a lot more information about what's going on under the covers!


Wednesday, 17 April 2013

CometD and Camel in the Enterprise - A Working Example

Full code is available on GitHub

All the Java code for the project is available in Github - requires Maven to run. See the README for more details - link here.

The Key Tech Players

The demo shows these various pieces of kit working (hopefully) harmoniously together:

What's the Scenario?

  • Imagine a complex enterprise infrastructure, running a bunch of different systems, servers and technologies, with data flowing between them all.
  • Commonly there will be some sort of Messaging system in play - handling routing and transformation of the various message flows. In our example we have ApacheMQ as the Messaging Server.
  • The endpoints of some of these flows may be to send data to multiple clients. One way to do this is to route messages to a Topic, and have many clients subscribed to that topic - this is easy of the clients are Java, but not so easy if the clients are Browsers.
  • Typically browser clients will have to use a hand rolled polling approach to check for new data being available.
  • Using CometD - the Browser can use some javascript to subscribe to a CometD channel, keeping a long running connection open - and the endpoint of that channel can push data down directly to all its subscribers. Handily, Apache Camel comes with a CometD plugin that can handle all this on the server side for us.

The CometD and Camel Demo

The demo shows HTML messages being sent to an ActiveMQ JMS Queue, from where they are picked up by Camel and then routed to a CometD channel endpoint.

A HTML page consumes messages from the CometD channel - and as they are received, adds them to a list on the web page. An example of the HTML page is shown below (I've added some JQuery to initially flash the images in red, and then fade them out to black over time - just to make it obvious as new messages arrive).

The messages are just simple, generated automatically from a Java client app, which pushes them to ActiveMQ using a Spring JMSTemplate.



This flow is shown in the diagram below. All three constituent parts can be run from the same project (but they must be started in the sequence shown - ActiveMQ/Camel server first, then web browser, then Java client)


Running the Demo

  • See the README file in Github for full details on how to run the demo (just remember they need to be started in the sequence above - ActiveMQ\Camel server needs to be running first for the browser to consume the cometd channel and the client to send JMS message)

The Camel Route

  • Properties are set in application.properties

  • The Camel Route - listens on a Queue and outputs to a CometD channel

The Javascript Consumer



The Java Client Message Dispatcher


The Java Client Message Creator


Future Extensions


  • Obviously this is a pretty simple demo - in a real world situation you may not want to be sending raw HTML down the channel. A better solution would be JSON, allowing the client to extract and format how it wanted to.
  • (And if you're using Camel - then you're probably doing a bunch of other conversions and processing in the space between receiving and publishing the data.)

Sunday, 3 March 2013

Spring ActiveMQ Producer Client Template

Available on GitHub

Demo available on GitHub

Overview

This is just a very small, very simple template project for sending messages to JMS queues.

In my current role we have a lot of different components and systems glued together by ActiveMQ and Camel. When developing\integration testing in this environment - it is often useful to create ad-hoc messages and send them to queues directly. This is usually the case when you need to test a subset of a larger system.

I usually knock up a test app to do this, often replicating this functionality time and time again. So I thought it might be useful to pop this into a simple template project that can be easily reused. It also made sense to 'mavenise' and 'springify' to make it more easily extensible if you need to plug in other components and features in tests.

Just to reiterate - this is just a starting point for developer style, ad hoc testing to support the development process.

Running the Demo

  1. Prerequisites
    1. Running instance of ActiveMQ (local/remote)
    2. Maven and JDK installed
  2. Configuring
    1. Edit application.properties to point to your ActiveMQ instance - specifying broker url and queue name
    2. Edit 'StartDemo.java' to provide the ObjectMessage the endpoint is expecting
    3. Type 'maven exec:java' to run the StartDemo class

application.properties


Looking at the Code

All the Spring configuration is set up in the main application-context.xml

application-context.xml



The ObjectMessages are created in the main StartDemo.java class

StartDemo.java

Communication with the server is abstracted away into a Dispatcher object.

MessageDispatcher.java

And that's about it really!

Conclusion 


As stated at the start - this is just a throwaway project I use as a starting point when I need to write something to send messages to an ActiveMQ Queue/

It's a compromise between something super simple (like a simple class), and something that lends itself easily to being extended. What can often start off as a simple test can grow into something more complex - and as we have maven and spring all configured at the start - it's then very easy to add in extra dependencies if you find you need them.

Sunday, 24 February 2013

Complex Event Processing Made Easy (using Esper)


The following is a very simple example of event stream processing (using the ESPER engine).

 

Note - a full working example is available over on GitHub:



What is Complex Event processing (CEP)?

Complex Event Processing (CEP), or Event Stream Stream Processing (ESP) are technologies commonly used in Event-Driven systems. These type of systems consume, and react to a stream of event data in real time. Typically these will be things like financial trading, fraud identification and process monitoring systems – where you need to identify, make sense of, and react quickly to emerging patterns in a stream of data events.

Key Components of a CEP system

A CEP system is like your typical database model turned upside down. Whereas a typical database stores data, and runs queries against the data, a CEP data stores queries, and runs data through the queries.

To do this it basically needs:
  • Data – in the form of ‘Events’
  • Queries – using EPL (‘Event Processing Language’)
  • Listeners – code that ‘does something’ if the queries return results

A Simple Example - A Nuclear Power Plant

Take the example of a Nuclear Power Station..




Now, this is just an example – so please try and suspend your disbelief if you know something about Nuclear Cores, Critical Temperatures, and the like. It’s just an example. I could have picked equally unbelievable financial transaction data. But ...

Monitoring the Core Temperature

 Now I don’t know what the core is, or if it even exists in reality – but for this example lets assume our power station has one, and if it gets too hot – well, very bad things happen..



Lets also assume that we have temperature gauges (thermometers?) in place which take a reading of the core temperature every second – and send the data to a central monitoring system.

What are the requirements?

We need to be warned when 3 types of events are detected:
  • MONITOR
    • just tell us the average temperature every 10 seconds - for information purposes
  • WARNING
    • WARN us if we have 2 consecutive temperatures above a certain threshold
  • CRITICAL
    • ALERT us if we have 4 consecutive events, with the first one above a certain threshold, and each subsequent one greater than the last – and the last one being 1.5 times greater than the first. This is trying to alert us that we have a sudden, rising escalating temperature spike – a bit like the diagram below. And let’s assume this is a very bad thing.

Using Esper

  • There are a number of ways you could approach building a system to handle these requirements. For the purpose of this post though - we will look at using Esper to tackle this problem
  • How we approach this with Esper is:
    • Using Esper – we can create 3 queries (using EPL - Esper Query Language) to model each of these event patterns.
    • We then attach a listener to each query - this will be triggered when the EPL detects a matching pattern of events)
    • We create an Esper service, and register these queries (and their listeners)
    • We can then just throw Temperature data through the service – and let Esper tell alert the listeners when we get matches.
    • (A working example of this simple solution is available on Githib - see link above)

Our Simple ESPER Solution

At the core of the system are the 3 queries for detecting the events.

Query 1 – MONITOR (Just monitor the  average temperature)

select avg(value) as avg_val 
from TemperatureEvent.win:time_batch(10 sec)

Query 2 – WARN (Tell us if we have 2 consecutive events which breach a threshold)


select * from TemperatureEvent "
 match_recognize ( 
   measures A as temp1, B as temp2 
   pattern (A B) 
   define  
     A as A.temperature > 400, 
     B as B.temperature > 400)

Query 3 – CRITICAL - 4 consecutive rising values above all above 100 with the fourth value being 1.5x greater than the first


select * from TemperatureEvent 
match_recognize ( 
measures A as temp1, B as temp2, C as temp3, D as temp4 
pattern (A B C D) 
define
A as A.temperature > 100, 
B as (A.temperature < B.value), 
C as (B.temperature < C.value), 
D as (C.temperature < D.value) and D.value > 
(A.value * 1.5))

Some Code Snippets

TemperatureEvent

  • We assume our incoming data arrives in the form of a TemperatureEvent POJO
  • If it doesn't - we can convert it to one, e.g. if it comes in via a JMS queue, our queue listener can convert it to a POJO. We don't have to do this, but doing so decouples us from the incoming data structure, and gives us more flexibility if we start to do more processing in our Java code outside the core Esper queries. An example of our POJO is below

package com.cor.cep.event;

package com.cor.cep.event;

import java.util.Date;

/**
 * Immutable Temperature Event class. 
 * The process control system creates these events. 
 * The TemperatureEventHandler picks these up 
 * and processes them.
 */
public class TemperatureEvent {

    /** Temperature in Celcius. */
    private int temperature;
    
    /** Time temerature reading was taken. */
    private Date timeOfReading;
    
    /**
     * Single value constructor.
     * @param value Temperature in Celsius.
     */
    /**
     * Temerature constructor.
     * @param temperature Temperature in Celsius
     * @param timeOfReading Time of Reading
     */
    public TemperatureEvent(int temperature, 
            Date timeOfReading) {
        this.temperature = temperature;
        this.timeOfReading = timeOfReading;
    }

    /**
     * Get the Temperature.
     * @return Temperature in Celsius
     */
    public int getTemperature() {
        return temperature;
    }
       
    /**
     * Get time Temperature reading was taken.
     * @return Time of Reading
     */
    public Date getTimeOfReading() {
        return timeOfReading;
    }

    @Override
    public String toString() {
        return "TemperatureEvent [" + temperature + "C]";
    }

}




Handling this Event

  • In our main handler class - TemperatureEventHandler.java, we initialise the Esper service. We register the package containing our TemperatureEvent so the EPL can use it.
  • We also create our 3 statements and add a listener to each statement
    

/**
 * Auto initialise our service after Spring bean wiring is complete.
 */
@Override
public void afterPropertiesSet() throws Exception {
    initService();
}


/**
 * Configure Esper Statement(s).
 */
public void initService() {

    Configuration config = new Configuration();
        
    // Recognise domain objects in this package in Esper.
    config.addEventTypeAutoName("com.cor.cep.event");
    epService = EPServiceProviderManager.getDefaultProvider(config);

    createCriticalTemperatureCheckExpression();
    createWarningTemperatureCheckExpression();
    createTemperatureMonitorExpression();
}


  • An example of creating the Critical Temperature warning and attaching the listener

   /**
     * EPL to check for a sudden critical rise across 4 events, 
     * where the last event is 1.5x greater than the first. 
     * This is checking for a sudden, sustained escalating 
     * rise in the temperature
     */
    private void createCriticalTemperatureCheckExpression() {
        
        LOG.debug("create Critical Temperature Check Expression");
        EPAdministrator epAdmin = epService.getEPAdministrator();
        criticalEventStatement = 
                epAdmin.createEPL(criticalEventSubscriber.getStatement());
        criticalEventStatement.setSubscriber(criticalEventSubscriber);
    }


  • And finally - an example of the listener for the Critical event. This just logs some debug - that's as far as this demo goes.

package com.cor.cep.subscriber;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.cor.cep.event.TemperatureEvent;

/**
 * Wraps Esper Statement and Listener. No dependency on Esper libraries.
 */
@Component
public class CriticalEventSubscriber implements StatementSubscriber {

    /** Logger */
    private static Logger LOG = 
            LoggerFactory.getLogger(CriticalEventSubscriber.class);

    /** Minimum starting threshold for a critical event. */
    private static final String CRITICAL_EVENT_THRESHOLD = "100";
    
    /**
     * If the last event in a critical sequence is this much greater 
     * than the first - issue a critical alert.
     */
    private static final String CRITICAL_EVENT_MULTIPLIER = "1.5";
    
    /**
     * {@inheritDoc}
     */
    public String getStatement() {
        
        // Example using 'Match Recognise' syntax.
        String criticalEventExpression = "select * from TemperatureEvent "
                + "match_recognize ( "
                + "measures A as temp1, B as temp2, C as temp3, D as temp4 "
                + "pattern (A B C D) " 
                + "define "
                + "       A as A.temperature > " + CRITICAL_EVENT_THRESHOLD + ", "
                + "       B as (A.temperature < B.temperature), "
                + "       C as (B.temperature < C.temperature), "
                + "       D as (C.temperature < D.temperature) " 
                + "and D.temperature > "
                + "(A.temperature * " + CRITICAL_EVENT_MULTIPLIER + ")" + ")";
        
        return criticalEventExpression;
    }
    
    /**
     * Listener method called when Esper has detected a pattern match.
     */
    public void update(Map<string temperatureevent=""> eventMap) {

        // 1st Temperature in the Critical Sequence
        TemperatureEvent temp1 = 
                (TemperatureEvent) eventMap.get("temp1");
        // 2nd Temperature in the Critical Sequence
        TemperatureEvent temp2 = 
                (TemperatureEvent) eventMap.get("temp2");
        // 3rd Temperature in the Critical Sequence
        TemperatureEvent temp3 = 
                (TemperatureEvent) eventMap.get("temp3");
        // 4th Temperature in the Critical Sequence
        TemperatureEvent temp4 = 
                (TemperatureEvent) eventMap.get("temp4");

        StringBuilder sb = new StringBuilder();
        sb.append("***************************************");
        sb.append("\n* [ALERT] : CRITICAL EVENT DETECTED! ");
        sb.append("\n* " + temp1 + " > " + temp2 + 
                " > " + temp3 + " > " + temp4);
        sb.append("\n***************************************");

        LOG.debug(sb.toString());
    }

    
}


The Running Demo

Full instructions for running the demo can be found here:

https://github.com/corsoft/esper-demo-nuclear

An example of the running demo is shown below - it generates random Temperature events and sends them through the Esper processor (in the real world this would come in via a JMS queue, http endpoint or socket listener).

When any of our 3 queries detect a match - debug is dumped to the console. In a real world solution each of these 3 listeners would handle the events differently - maybe by sending messages to alert queues/endpoints for other parts of the system to pick up the processing.



Conclusions

Using a system like Esper is a neat way to monitor and spot patterns in data in real time with minimal code.

This is obviously (and intentionally) a very bare bones demo, barely touching the surface of the capabilities available. Check out the Esper web site for more info and demos.

Esper also has a plugin for Apache Camel Integration engine - which allows you to configure you EPL queries directly in XML Spring camel routes, removing the need for any Java code completely (we will possibly cover this in a later blog post!)