Real-time analytics with Apache Storm – now in F#

Over the past several month I’ve been prototyping various aspects of  an IoT platform – or more specifically, exploring the concerns of “soft” real-time handling of communications with potentially hundreds of thousands of devices.

Up to this point, being in .NET ecosystem I’ve been building distributed solutions with a most excellent lightweight ESB – MassTransit, but for IoT we wanted to be a little closer to the wire. Starting with the clean slate and having discovered Apache Storm and Nathan’s presentation I realized that it addresses exactly the challenges we have.

It appears to be the ultimate reactive microservices platform for lambda architecture: it is fairly simple, fault tolerant overall, yet embracing fire-n-forget and “let it fail” on the component level.

While Storm favours JDK for development, has extensive component support for Java developers and heavily optimizes for JRE components execution, it also supports “shell” components via its multilang protocol. Which is what, unlike Spark makes it interesting for a .NET developer.

Looking for a .NET library to implement Storm components there’s the Microsoft’s implementation – unfortunately components in C# end up looking rather verbose and it happens to work exclusively with HDInsight/Azure, which is a deal breaker for us, as we want our customers to be able to run it anywhere. Fortunately though, further search revealed recently open-sourced FsStorm announced on Faisal’s blog and I liked it at first sight: concise F# syntax for components and the DSL for defining topologies makes authoring with it a simple and enjoyable process.

The FsStorm components could be just a couple of lines of F#, mostly statically verified, have clear lifecycle and easy to grasp concurrency story. And with F# enjoying 1st class support on Mono, we are able to run Storm components effectively on both dev Windows boxes and distributed Linux clusters while capitalizing on productivity and the wealth of .NET ecosystem.

It is now available under FsStorm umbrella as a NuGet package, with CI, a gitter chatroom and a bit of documentation.

While still in its early days, with significant changes on the horizon – something I want to tackle soon is static schema definitions for streams and pluggable serialization with Protobuf by default, I believe it is ready for production, so go forth and “fork me on GitHub”!

Real-time analytics with Apache Storm – now in F#

Arriving at Scalability – bandwidth management

No matter how fast and vast your infrastructure, at some point you find that it’s not enough or it’s not used efficiently. The messages make sense and the endpoints are configured in optimal way, the problem becomes the updates themselves – it’s expensive to carry out many little updates to the state w/o sacrificing the ACID properties.

No matter how fast an endpoint can process the messages, it’s possible to get more messages than you can process – the messages start to pile up. It’s OK if you know a reprieve is coming, but what if it’s not?

Batching to the rescue – we have to make effective use of the bandwidth we have on any given endpoint and some times it means repackaging the messages into something that carries the summarized bulk down for the actual updates.

The approach we’ve found simple and effective is to use some kind of persistent Event Store to keep all the message information, then, based on a trigger (timer or otherwise) group and aggregate the events before proceeding with the domain model updates.

We freed up the endpoint and the storage and avoided introducing some kind of conflict resolution inherent to eventual consistency models for dealing with updates at scale.

Arriving at Scalability – bandwidth management

Where does my domain logic go?

I see sometimes debates sparking over smart UIs and where domain logic should go. My view has always been that your domain logic is everywhere. User experience is as much part of your domain as logical structure of your database. But what I noticed is there’s an interesting fluidity that work on Scalability exposes: certain domain logic is sometimes a Presentation concern, sometimes –  Application Services and sometimes – both.

A good counter-example is validation: you really want to give the user feedback early and often, is it a Presentation concern then? Where do error messages get defined? The answer in both cases – it belongs in  Domain layer, but it has to be sufficiently abstract to allow for flexibility that Presentation requires. This I see done wrong all the time, with fancy frameworks and lot of pride taken in doing it, but that’s for another post.

Going back to the original point, Scalability requires that you do as much work as possible when the user is not looking. That tends to shift a lot of what normally would go in Presentation into Application Services layer and a different tier. That means having a separate “bounded context” within your domain layer to keep the state for Presentation layer for immediate and fast consumption.

This is not unexpected, but you don’t think about it until you have to serve a lot of domain logic quickly.

So the answer is – it’s everywhere, with different aspects captured in different layers.

Where does my domain logic go?

Arriving at Scalability – part 3

This is a part 3 of the series started in part 1 and part 2.

One of the things that becomes obvious when tackling Scalability is that calculating certain things on the fly takes too long to be practical. Another obvious thing is that the logic that deals with data needs to be executed somewhere close to the data.

Denormalization

By structuring the data in such a way that we can hold on to the results of the calculation we can take advantage of the cloud processing capabilities we have on the backend. We end up with copies of many things, but by partitioning the data into Aggregates we are free to modify any bits w/o any locking issues. It also opens the doors to further distribution – if you have your own copy, it doesn’t matter where you work on it. The interested parties, such as UI, will eventually become consistent all along returning a cached copy of data.

Event-Driven Services

Introducing copies of data means we need to know when to update them. By communicating via messages that represent domain events taking place, we let our services work within their narrow scope with their own copy of the data. Once they modify their little part of the domain, all they have to do is notify the parties that depend on it with particulars of what was done.

Push notifications for UI

UI becomes just another publisher and subscriber of business events, triggering the changes and minimizing the reads. The delays between a change taking place and the UI reflecting it has to be kept an eye on, but by computer standards humans are slow.  We read and write at glacial pace and while computers carry out all this eventing, processing and copying of the data, a human would barely make a click or two.

Batching

Taking a lot of data in and promising to get back with the results via asynchronous means is another thing made possible once you embrace fire-and-forget methods of communication. By looking at a batch, we can employ more intelligent strategies about resource acquisitions and aggregate the events, enabling all the parties involved to do their thing more efficiently.

Putting it all together we can take our scale-out efforts pretty far: If a particular calculation is very demanding, we can put the service carrying it out on a separate machine and it’s not going to affect anything else. This is very powerful, but eventually we’ll hit a wall again – even within a narrow scope we’ll accumulate too much data. The data we have partitioned “vertically” will have to be partitioned “horizontally”. It’s a big challenge, but also the “holy grail” of scalability and we have some ideas as to the approach and maybe one day I’ll be able to tell about it as well.

Arriving at Scalability – part 3

Arriving at Scalability – part 2

Several decisions we’ve made earlier became powerful enablers for our first pass at Scalability.

 

Queryable Repository

Data reads take time, reading less than everything is always a good idea. Implementing Repositories as queryable allowed us to switch to paged, ordered and generally highly selective views.

 

Unit of Work

Effectively building small projections for the UI from large datasets was possible due to request-scoped Unit of Work implementation, enabling underlying ORM to read data across multiple Repositories.

 

Aggregate Roots

Root Aggregate, defined as a cluster of associated objects treated as a unit for the purpose of data changes. By implementing Repositories one per aggregate, we isolated data affected by transactions and avoided deadlocks. With a bit of per-ID synchronization we were able to avoid optimistic concurrency exceptions as well.

 

Message passing

By integrating message passing early, we were able to move all the writes to backend services and avoid distributed locks in exchange for eventual consistency. Moving the backend services to another machine after this was trivial.

 

And that’s how by scaling out we were able to get that x100 performance gain, on the cheap. We’ll do it again a few months later by batching, denormalizing the data and embracing Event-Driven SOA.

Arriving at Scalability – part 2

Arriving at Scalability

This is going to be a series of posts exploring my revelations stemming from last couple of years designing a SaaS solution for IT organizations.

Probably like most startups, or even most new product developments we set out to deliver the features. Scalability was one of the desired quality attributes, but considering how prioritizing one attribute affects all the others, it wasn’t on top. Moreover, knowing about experiences at places like MySpace, it’s a given that we’ll end up rewriting some parts of the system with every other increase in the order of magnitude of user transactions. As our understanding of our domain and our users improves, as the usage metrics become available we’ll figure out what needs to change to get the next x10.

With this in mind, we set out to work with these priorities for the code:

  • Small and easy to maintain
  • Secure
  • RAD-supported, rich UI with dynamic query capabilities
  • Low friction data access
  • Low coupling between interacting bits

Simple. No Scalability here. Leaving aside Security, this is what it translated into:

  • Lightweight, POCO domain model with Root Aggregates
  • Unit tests and Continuous Integration
  • Silverlight with 3rd party controls and RIA services
  • An ORM and SQL Server backend
  • Message passing with guaranteed delivery

Here is where it gets interesting, with some of the patterns we used in implementation:

  • A Repository capable of carrying out dynamic queries
  • Unit of work
  • Dependency injection
  • Lightweight service bus with publisher/subscriber

At this point we’re in our second or third month of development, with the staging environment getting ready for testing on EC2. Every developer has a complete system running on their machine. We handle the entire projected load while running on a single virtual machine and we haven’t even had to make any sacrifices or do anything in terms of optimizations.

These may or may not seem impressive, but if felt good to get some stuff done and done right. All along the features are the focus and we can handle several concurrent users and hundreds of data entities.

Next I’ll talk about our first Scalability endeavour and how relatively small an effort that was for x100 gain.

 

Continued in Part 2

And concluded in Part 3

Arriving at Scalability