DataDome

State management in Apache Flink: How DataDome handles frequent changes in state structure.

Table of contents

tl;dr: In fast development cycles, you will frequently change the structure of Flink operator states. If you’re fine with losing the state of a given operator, you can just change the UUID — but this is easy to forget, especially if you’re changing a generic class that is used in several places. DataDome’s R&D team therefore developed a custom TypeSignature, which automatically changes the UID of an operator whenever there is a change in the state structure. This ensures that Flink will not try to reload the old state.

Gaël Renoux, Senior developer at DataDome, presented this topic at the 2020 Flink Forward Global Virtual Conference.

The source code is available here under an Apache-2.0 license: https://github.com/DataDome/flink-utils.

Bot detection with Apache Flink

DataDome is a global cybersecurity company. We offer a solution which protects e-commerce and classified ads businesses against all OWASP automated threats: account takeover, web scraping, card cracking, layer 7 DDoS attacks, etc.

To protect our customers’ websites, mobile apps and APIs, we analyze every single request to these endpoints. We extract hundreds of different signals from each request, both server-side and client-side, and use artificial intelligence to detect malicious bot patterns and automate real-time blocking decisions. In total, we process more than a trillion pieces of data every day.

To detect new bot patterns with minimum latency as part of Apache hosted website bot protection, we manage our stream processing engine with Apache Flink. As we continuously receive and analyze HTTP requests from our customers, we use Flink to update the models that enable us to determine whether any given request is made by a human or a bot.

See more: Engineering a low-latency decision loop for bot management

The challenge: frequent changes in state structure

Because we’re dealing with some of the internet’s fastest-changing and most insidious threats, we have a rather hectic development cycle. We add new features and improvements all the time: a week without a deployment to production is almost unheard of.

The way we used to deploy new versions was pretty basic: we would cancel the existing job with a savepoint, deploy the new version from the savepoint, and everything was fine. Except when it wasn’t.

From time to time, the job would crash on restart. Why?

Since you’re reading this, chances are that you’re familiar with this error message:

The state has changed (you renamed something, or added a field), and the new state structure is not compatible with the old state structure.

Two obvious solutions that won’t work

The first fix that might come to mind is to just drop the state and deploy from scratch. Unfortunately, that’s not good enough for our purpose.

Dropping the state would have us running in a somewhat degraded mode for the time it takes to rebuild our states. Besides, most of the time, our upgrade changes just one operator’s state. Here is an illustration of one of our jobs in production, and what a typical change would look like:

Resetting the state of the whole job is overkill. We want to reset the state only for that particular operator, and keep everything else.

The second obvious solution is Flink’s state schema evolution feature. Without going into detail, this won’t work for us either. We’re mostly using Kryo as our serialization engine, which isn’t supported (as of Flink 1.12).

Another way to use this feature would be to transform our state to use only POJO types. Not only would this make our state more verbose (and more susceptible to misuse), this comes with its own limitations. For instance, it cannot handle name change between fields, and it isn’t great at handling new fields (default values are always the Java default values − null for objects, 0 for numbers, etc., and not something you can set).

First idea: naive versioning

Our first attempt at a solution was fairly obvious. We know that the state is kept with the UID of the operator as the key, so we decided to just start versioning the keys.

Java

We did that for a while, and all was well, until an upgraded job crashed again. What happened this time?

Obviously, the state had changed somewhere.

Why hadn’t we changed the version number? Because we were preparing a new feature, adding fields to existing business classes, not touching existing operators.

So what had changed? We had modified a class that was used inside a class that was used inside a class that was, in fact, the state of an existing process-function.

Sure, we should have remembered to update the version number. But we didn’t. So what now? There were two possible ways forward:

  • Decide that we would never make this mistake again. Always double-check if we have to update a version number.
  • Admit that people will make mistakes, and we need to automate this in some way.

Since we’re not robots — heck, we spend most of our waking hours (and some of the sleeping ones) fighting them — we opted for the second approach. We decided to create something that automatically updates the UID of an operator whenever the structure of its state changes.

The basic idea is that the UID now contains some kind of signature for the current state structure, that will change when the structure changes, ensuring that the UID is not the same.

There’s one obvious tradeoff. Since the process is automated, there will be times when a change results in the accidental loss of the state. On some operators, it could be rather a big deal. Fortunately, this risk is relatively easy to prevent: we just need to add a unit test which alerts us whenever the state signature has changed for those important operators, to ensure that it was intentional.

TypeInformation to the rescue?

Our task is now to find a way to automate the UID signature. First idea: use TypeInformation.

TypeInformation is how Flink describes the serialization and deserialization information for a type. In the Scala API, TypeInformation is implicitly available for the type you want. In Java, it’s a bit more verbose, especially if you’re using generic types, but it’s still rather straightforward.

Java
Scala

And here’s how we’re going to define the signature for a process-function depending on its state:

So now we have defined a signature method that’s going to automate the UID update for us. Let’s put our method to the test in a couple of different scenarios.

For generic types, our method seems to work well. When we try to create the signature for a List[Population] and a List[Person], we get two different values as expected: the parameter type is taken into account. It also works with custom generic types.

But how about functions with multiple states, either because we have a broadcast state, or because we just have several states? We can just take a signature for a tuple of the different states we want.

However, here we see a first weakness in our method. We haven’t taken into account that state is a ValueState, while state2 is a MapState. We’re just hoping that we’re never going to replace our MapState with a ValueState containing a Key and another one containing a Statistics. Worth noting, but usually not that big of a deal.

We did put the TypeSignature method to use, and for a while it worked very well … until we had another crash on the deployment. Here’s the change we did:

What went wrong? It turns out that the signature was the same in both cases… Unfortunately, TypeInformation only works as expected for case classes. If you have a normal class, the TypeInformation will not depend on the fields inside.

As a workaround, we can put what’s inside our class inside the signature itself:

However, that takes us back to square one, sort of. Whenever we make a change inside the Something class, we will have to remember to update MyFunction, which was what we wanted to avoid in the first place.

Our solution: a custom TypeSignature

You know what they say: if at first you don’t succeed … So what’s next? Using an existing mechanism didn’t work, so let’s design a custom TypeSignature.

We want our TypeSignature to fulfill the following three requirements:

  • It must differentiate between the various subtypes of states: we can’t have the same signature for ValueState[In]] and ListState[Int], or for ValueState[List[Int]] and ListState[Int].
  • It must enable writing a custom signature for a class. This is necessary for non-case classes.
  • Ideally, we don’t want to write too much code, because like all good developers we’re kind of lazy that way.

Can we find a way to satisfy all of this? Yes! Here’s how we did it. We’ll go over how we use our custom TypeSignature in the job’s code, and start showing the TypeSignature code only once we’ve seen how it’s intended to be used.

Let’s start with defining a custom type signature for the class Something, which will be used as our state. We’ll make the value implicit, and given it’s on Something’s companion object, it will be automatically found when we look for a TypeSignature[Something].

Don’t worry about the content of the type-signature for now, we’ll get to that later.

In the process-function itself, we’re going to define a StateTypeSignature, and this time we’re going to be a bit more explicit about what we’re doing. Instead of just passing the types without any significance attached to them, we’re going to say “okay, we’re creating a StateTypeSignature specifically for a keyed process function, so we have to put a key and the state that’s being used”. And this time, we can specify that the state is for example a ListState. Here it goes:

Again, the StateTypeSignature is implicit and on the companion object, so it’s directly accessible when looking for a StateTypeSignature[MyFunction]. As an added bonus, the builder is type-safe: since MyFunction is a KeyedProcessFunction, if you forget to set the key or the state, the code won’t even compile. If you have no state or want to forego setting it, you have to explicitly call noState.

Finally, all you need in the job definition is to call uidStated (an extension method on the DataStream) to set an UID and a type, and it will implicitly load the correct StateTypeSignature to complete it:

Now, back to what we touched upon earlier: how do we create the TypeSignature?

Well, first, we can have the TypeInformation as a backup. After all, it did work for case classes. So, we can create a TypeSignature based on the TypeInformation.

For a complex class, the TypeSignature must be built manually. We’ll have to specify all the needed types, and implicitly load the TypeSignatures for them. It could be made better with macro, but well − that’s still in the future.

For all basic types (Boolean, Int, String…) as well as basic collections and tuples, implicit type signatures are provided. We also provide type signatures for all case classes (using Shapeless and type derivation), assuming of course all fields in the case class have their own implicit type signature.

You can see the source code for these default signatures on Github.

With all of that, we don’t even need a specific TypeSignature for most cases, because it’s all available as a default implicit. For example, in the code below we have implicit TypeSignatures for Person, for List[Person], for a tuple of Person and String, etc.

Summary

To wrap up, here’s a quick summary of what we did to automatically change the UID of an operator whenever there is a change in the state structure.

Define any custom TypeSignature in the companion object of the class: this means that whenever we modify the class, the TypeSignature is just there, in the same file, right under our nose. It’s easy to remember to update it.

Define the process-function’s StateSignature using only the locally visible state: If we change something in the process-function, we know we need to change the StateSignature. If we change something in other classes, we don’t have to worry about the StateSignature of any process-function.

Mark the operator as stateful with the uidState method in the job definition.

To illustrate how it works on a simple change:

  • We have our class Something, where we just added the field name, and our class Person, when we added the field nickname.
  • Adding the field name in Something will modify the custom StateSignature of Something.
  • Adding the nickname in Person will modify the implicit TypeSignature for Person, which will in turn modify the TypeSignature of Something
  • The change in Something’s type signature will modify the StateTypeSignature for MyFunction, which will change the operator’s UID.

Conclusion: any single change in any of these classes is guaranteed to change the UID of the operator.

Again, the full source code, available under an Apache-2.0 license:
https://github.com/DataDome/flink-utils. We invite you to explore it to learn more about how this works.

Good luck!

DataDome
dd product home overview

Still exploring?

Start with an on-demand demo.