logoRequest a demo
logo

Real-Time Pattern Recognition and Processing with Apache Flink

August 28th, 2024

Author
Max Lahlou

One of the greatest challenges in generating insights from datasets is the ability to separate undefinable fluctuations from quantifiable signals. This problem becomes increasingly hard to resolve live with high-frequency time-series data, which can generate data at a second or millisecond level.

These are challenges that we come across daily in our work at Ferry. Our mission is to be the operating system that the world’s factories run on, and manufacturing is one of the largest data generators out there. One production line alone can produce 4 trillion data points a year!

So what approaches can we take to manage large-scale noisy time series datasets, and extract insightful metrics? In this article, we elucidate how to use Apache Flink, and in particular, its MATCH_RECOGNIZE function, to identify desirable patterns indicative of good data measurements.

A real-life case study: yield management

One of the common applications that is built on Ferry is real-time yield management. Wastage is a critical lever that manufacturers can reduce to improve profitability, with a common source of this type of leakage being overfilling packages & containers with product. Therefore tracking yield in live production - that is, the weight of product - helps manufacturers ensure that they are not losing product by overfilling packaging above their target weights (or worse, underfilling them!).

The way to tackle this problem is two-fold: part machinery, part software. To physically weigh packages manufacturers often use a piece of machinery called a checkweigher. Think of a checkweigher as a conveyor belt with scales underneath: as bags flow along the line, weights can be recorded.

Note: sometimes checkweighers are designed for rejection - that is, if a package is underfilled, it will be discarded from the line. Our objective here is primarily the inverse - assessing overfilling of packages. Therefore, we do not need to measure every package, but rather a large enough sample of packages to identify yield losses.

This sounds simple enough, but this type of set-up often leads to extremely high frequency data with a lot of noise. Here’s a few reasons why:

  1. Non-constant number of packages on the line.

    The measurement of the checkweigher is affected by the weight of the entire production line. Since the production line is live, the number of packages on the line could vary between one, several, or none. An additional factor to consider here is that the measured weight of a filled package depends where on the production line it is located; a package could have a measured weight less than its actual weight if it is being loaded/unloaded onto the production line. In Figure 1, we can clearly see these fluctuations in the fact that the measurements appear to regularly peak and trough.

  2. Checkweigher taring issues. As with any weight scale, it is important to regularly tare the checkweigher. The taring process is automatic, and errors can occur if the checkweigher tares while there are filled packages on the line.

    • Taring a checkweigher involves setting the scale to zero with an empty container or packaging in place, allowing for accurate measurement of the net weight of the product being weighed without including the weight of the container.

  3. Checkweigher Gaussian error. The checkweigher has some intrinsic error in each measurement. This is usually not a problem since the intrinsic fluctuations are usually on the order of 10 grams. For most applications, this can be considered negligible.

  4. Fluctuations in the filled package weights themselves. This is the variance that we are most interested in quantifying. The causes of these errors are highly dependent on the specifics of the production line. Some example sources of weight variance are unoptimized tuning of machinery, human error in the manufacturing process, and unaccounted changes in underlying components.

Figure 1: Recorded weight values (in grams) versus time of measurement

The fourth source of variance is the metric that we want to measure as this directly affects product yield, and could point out potential inefficiencies in the production line. However, the data displayed in Figure 1 is affected by the combination of all four fluctuations. How can we collect data specific to the actual fluctuations in the filled package weights themselves?

Sources 2 and 3 can be accounted for using outlier rejection and Bayesian statistics (which will be a topic for another article). The fluctuating number of bags on the line can be accounted for with pattern recognition directly in Apache Flink via MATCH_RECOGNIZE , which will be the focus of the rest of this article.

Apache Flink and MATCH_RECOGNIZE

Apache Flink is an open-source stream processing framework designed for large-scale data processing, and is a perfect framework for many of Ferry’s pipelines. It allows us to process real-time data streams and batch data with high throughput and low latency. Flink's powerful engine also supports event-driven applications, stateful computations, and complex data transformations, making it an ideal choice for scenarios where real-time insights and decision-making are critical.

Flink is particularly useful for continuously processing vast amounts of data efficiently. Its ability to handle both stream and batch processing in a unified manner, combined with features like fault tolerance, scalability, and exactly-once processing guarantees, makes it a versatile tool for building robust, high-performance data processing pipelines.

MATCH_RECOGNIZE is a powerful feature in Flink that allows for detecting complex patterns in data streams. This function can recognize sequences of events that occur periodically over time, and record specific metrics over those events. The following section gives a detailed introduction to the mechanics of MATCH_RECOGNIZE as well as a basic use case.

Prerequisites

To use MATCH_RECOGNIZE in Apache Flink, you will need the following:

  1. Apache Flink installed

  2. Java Development Kit (JDK) 8 or higher

  3. A text editor or Integrated Development Environment (IDE)

Syntax

MATCH_RECOGNIZE (
  PARTITION BY <expr>
  ORDER BY <expr>
  MEASURES 
	  <expr> as <alias>
	<row_clause>
  PATTERN (<pattern>)
  DEFINE
	  <expr> as <symbol>
)
  1. PARTITION BY: partitions the table

    • similar to GROUP BY clause in SQL

  2. ORDER BY: what values to order incoming rows by

    • see SQL documentation for ORDER BY

  3. MEASURES: defines output of the clause

    • similar to a SELECT clause in SQL

  4. OUTPUT_TYPE: define number of rows for each output (all or one)

  5. ONE ROW PER MATCH - the table contains one row corresponding to all inputs matching the pattern; values must be aggregated across rows in a pattern

  6. ALL ROWS PER MATCH - the table contains all rows corresponding to the inputs matching the pattern

  7. PATTERN: allows constructing patterns that will be searched for using a regular expression-like syntax.

  8. DEFINE: this section defines the conditions that the pattern variables must satisfy

    • Define each pattern as a variable using standard SQL commands

Basic Example: Co-integrated Stock Data

Suppose you are trading stocks, and want to analyze two stocks with co-integrated prices. You might be interested in the spread between these prices, and in particular when these prices diverge by a certain amount. With MATCH_RECOGNIZE, we can identify these patterns of interest, and calculate valuable metrics to quantify them.

Data

In this stylistic example, we have two time series datasets, one for the price of each stock. They are cointegrated such that the difference in price between the two stocks is stationary with expected value 0. The simulated prices are plotted in Figure 2.

Figure 2: Simulated price data for two cointegrated stocks

Patterns and Metrics

Let us refer to the absolute difference in price between Stock A and Stock B as the spread. Since the spread is expected to be 0, the patterns we want to identify are ticks corresponding to times when the prices diverge by a substantial amount. In this simple example, we want to identify periods where the spread is greater than $5.

Figure 3: Spread between the prices of the two cointegrated stocks. The horizontal line corresponds to a spread of $5.

A useful metric to measure might be the number of ticks in each instance of the pattern, as the time-length of signals is often incorporated into the development of execution strategies.

Example Implementation

Using Flink’s MATCH_RECOGNIZE, we can find patterns, calculate metrics, and save results of a live flow of data. Consider a live flow of price data, assembled into a row titled “prices,” with entries “A” and “B” corresponding to the prices of stock A and stock B, respectively. We can use the following code to collect our desired metrics:

SELECT 
  start_ts, 
  end_ts, 
  num_ticks
FROM prices
MATCH_RECOGNIZE (
  ORDER BY event_time
  MEASURES
      FIRST(P.event_time) AS start_ts,
      LAST(P.event_time) AS end_ts,
      COUNT(P.A) as num_ticks
  ONE ROW PER MATCH
  AFTER MATCH SKIP PAST LAST ROW
  PATTERN (P+)
  DEFINE
      P AS (P.A IS NOT NULL AND P.B IS NOT NULL) AND
          (ABS(P.A - P.B) > 5)
);

Below is a sample output of this query with a simulated flow of data in Python:

Figure 4: Sample output with data simulated at 5 ticks per second (see below for more information about simulated flows in Python).

Code Explanation

  1. MEASURES:

    • start_ts: record the start of a pattern

    • end_ts: record the end of a pattern

    • num_ticks: record the number of ticks in an identified pattern

  2. OUTPUT_TYPE: ONE ROW PER MATCH

    • summarize each identified segment with a single row

  3. PATTERN:

    • greedily search for pattern P since we want every instance within a pattern

  4. DEFINE:

    • P is defined as wherever the weight measurement is not Null, neither is its previous value, and where the spread is greater than $5

This is a very simple example of how MATCH_RECOGNIZE can be used with live data, and only offers a glimpse into the possibilities for live data analysis using Apache Flink. Below, we will discuss using MATCH_RECOGNIZE with real data in a live data flow.

Stable Weight Measurements With MATCH_RECOGNIZE

Let us return now to the original problem at hand: how to collect good weight measurements from a checkweigher on a production line. Referring back to Figure 1, we can see that the weights measured by the checkweigher momentarily stabilize when the number of bags on the line is unchanging. We can define these spots as areas where the difference between adjacent points is smaller than some defined threshold. In this example, a threshold of 10 grams works well. Since MATCH_RECOGNIZE takes rows as inputs, let us add a column with weight measurements at the previous tick. This can easily be done using SQL’s LAG function, with which we can gather all the necessary columns into a new table titled “difference_table:”

WITH difference_table as(
        SELECT 
            weight,
            event_time,
            LAG(weight, 1) over (order by event_time) as prev_weight
        FROM
            input_table
    )

Now we will construct the MATCH_RECOGNIZE call to identify these points of good data, and take the average value of each segment. This averaging also has the added benefit of reducing variance from the checkweigher’s inherent variance. Assuming the checkweigher is properly tared, this results in unbiased measurements.

First we define the desired pattern as points within 10g of the previous measurement, and also ensure that points are not Null:

DEFINE
	A AS A.weight IS NOT NULL AND 
	  (A.prev_weight IS NOT NULL AND ABS(A.weight - A.prev_weight) < 10)

In each pattern, we calculate the average value (i.e., our metric of interest), and also keep track of timestamps and number of points for debugging and visualization purposes:

MEASURES
	AVG(A.weight) AS avg_weight,  -- Calculate the average weight
  FIRST(A.event_time) AS start_ts   -- Record the start timestamp 
  LAST(A.event_time) AS end_ts,       -- Record the end timestamp
  COUNT(A.weight) as num_measurements -- Record the number of ticks

We want every instance of this pattern to be recorded, so we specify a greedy pattern search. We also only want instances with one or more points, hence the + in A+. However, we only want one row to summarize each match, so we specify ONE ROW PER MATCH . Finally, since we want each group of stable points to be independent of each other, we include AFTER MATCH SKIP LAST ROW . The entire MATCH_RECOGNIZE call is as follows:

SELECT avg_weight, start_ts, end_ts, num_measurements
	FROM difference_table
	  MATCH_RECOGNIZE (
	    ORDER BY event_time
      MEASURES
		    AVG(A.weight) AS avg_weight,  -- Calculate the average weight
        FIRST(A.event_time) AS start_ts   -- Record the start timestamp 
        LAST(A.event_time) AS end_ts,       -- Record the end timestamp
        COUNT(A.weight) as num_measurements
			ONE ROW PER MATCH
      AFTER MATCH SKIP PAST LAST ROW
      PATTERN (A+ C)
      DEFINE
	      A AS A.weight IS NOT NULL AND 
	        (A.prev_weight IS NOT NULL AND ABS(A.weight - A.prev_weight) < 400)
   );

Putting this all together, we get a succinct piece of code:

WITH difference_table as(
        SELECT 
            weight,
            event_time,
            LAG(weight, 1) over (order by event_time) as prev_weight
        FROM
            input_table
    )
SELECT avg_weight, start_ts, end_ts, num_measurements
	FROM difference_table
	  MATCH_RECOGNIZE (
	    ORDER BY event_time
      MEASURES
		    AVG(A.weight) AS avg_weight,  -- Calculate the average weight
        FIRST(A.event_time) AS start_ts   -- Record the start timestamp 
        LAST(A.event_time) AS end_ts,       -- Record the end timestamp
        COUNT(A.weight) as num_measurements
			ONE ROW PER MATCH
      AFTER MATCH SKIP PAST LAST ROW
      PATTERN (A+ C)
      DEFINE
	      A AS A.weight IS NOT NULL AND 
	        (A.prev_weight IS NOT NULL AND ABS(A.weight - A.prev_weight) < 400)
   );

whereby:

  1. difference_table: table containing tick data of weights, timestamps, and a 1 tick lag of weights

  2. MEASURES:

    • avg_weight: aggregate weights via averaging

    • start_ts: record the start of a pattern

    • end_ts: record the end of a pattern

    • num_measurements: record the number of datapoints in a pattern

  3. OUTPUT_TYPE: ONE ROW PER MATCH

  4. PATTERN:

    • greedily search for pattern A and C

    • since pattern C is not defined, it is never completed, and thus this query only finds patterns defined by variable A

  5. DEFINE:

    • A is defined as wherever the weight measurement is not Null, neither is its previous value, and where the difference between the current weight and previous weight is less than 10g.

With just a few lines of code, we have isolated the time intervals in which the number of packages on the line is unchanging, resulting in stable measurements of package weights.

Simulating Job in Python with PyFlink

To see these queries in real-time, we can simulate a flow and real-time queries in Python using PyFlink. PyFlink is a Python library that allows us to create and handle Apache Flink flows directly in Python.

Simulating Input

The first step to testing our MATCH_RECOGNIZE queries is to simulate live data measurements. There are several ways to do this. For simplicity, we make use of Flink’s ability to call “user-defined functions” (UDFs) by defining a simulated input class:

import numpy as np
from pyflink.table.udf import ScalarFunction
class CreateSimulatedModelInput(ScalarFunction):
    def open(self, function_context):
        """
        This is the open function - it runs once when the UDF is initialized
        Use this to load simulation model and other resources
        """
        self.data = np.load('data.npy')
        self.index = 0
        self.max_ind = len(self.data)

    def eval(self, timestamp: datetime):
        """
        This is the eval function - it should output a simulated data point
        """
        if self.index == self.max_ind:
            self.index = 0
				weight = self.data[self.index]
        self.index = self.index+1

Here, the simulated data is just looped through the rows of real historical data contained in a NumPy file named “data.”

Now we can make this callable in Flink by defining a UDF:

create_simulated_model_input = udf(CreateSimulatedModelInput(),
                                   input_types=[DataTypes.TIMESTAMP(3)],
                                   result_type=DataTypes.ROW(row_fields=[
                                       DataTypes.FIELD("weight", DataTypes.FLOAT()),
                                       DataTypes.FIELD("sku_id", DataTypes.STRING())
                                   ])
                                   )

Table Environments

To organize the flow of data, PyFlink uses table environments. To make our simulation more streamlined, we use the following function to create a table environment in Python:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

def create_table_env() -> (StreamExecutionEnvironment, StreamTableEnvironment):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    table_env = StreamTableEnvironment.create(env)

    # no parallelism for this job
    table_env.get_config().set("parallelism.default", "1")

    # set the state TTL to 10 minutes to prevent excessive state growth
    table_env.get_config().set("table.exec.source.idle-timeout", "1000")

    return table_env

Putting It All Together

With our helper functions and classes created, now we just have to create the table environment and make live calls to gather and process data. First, let us define the table environment:

from pyflink.table import Schema, DataTypes, TableDescriptor

#simulation speed
ROWS_PER_SECOND = 1

#creating the table environment
table_env = create_table_env()

schema_builder = Schema.new_builder()

schema_builder.column("event_time", DataTypes.TIMESTAMP(3))
schema_builder.watermark("event_time", "event_time - INTERVAL '5' SECOND")

schema = schema_builder.build()

table_env.create_temporary_table("tigger_source", descriptor=TableDescriptor.for_connector("datagen")
                                 .schema(schema)
                                 .option("rows-per-second", str(ROWS_PER_SECOND))
                                 .build())

source_table = table_env.from_path("tigger_source")

table_env.create_temporary_function("create_simulated_model_input", create_simulated_model_input)

Next, we can call the inputs, and subsequently, our MATCH_RECOGNIZE query:

# this query creates a live input table
query = table_env.sql_query("""
select
    event_time,
    sku_id,
    weight
FROM (
    SELECT
        event_time,
        create_simulated_model_input(event_time) as simulated_data
    FROM tigger_source
)
""")

table_env.create_temporary_view("input_table", query)

# this query calls MATCH_RECOGNIZE to identify stable sections
query = table_env.sql_query("""
    WITH difference_table as(
        SELECT 
            weight,
            event_time,
            LAG(weight, 1) over (order by event_time) as prev_weight
        FROM
            input_table
    )
    SELECT avg_weight, start_ts, end_ts, num_measurements
        FROM difference_table
        MATCH_RECOGNIZE (
            ORDER BY event_time
            MEASURES
                AVG(A.weight) AS avg_weight,  -- Calculate the average weight in the flat spot
                FIRST(A.event_time) AS start_ts,            -- Record the start timestamp of the interval
                LAST(A.event_time) AS end_ts,                -- Record the end timestamp
                COUNT(A.weight) as num_measurements
            ONE ROW PER MATCH
            AFTER MATCH SKIP PAST LAST ROW
            PATTERN (A+ C)
            DEFINE
                A AS A.weight IS NOT NULL AND 
                    (A.prev_weight IS NOT NULL AND ABS(A.weight - A.prev_weight) < 400)
        );
""")

Note the additional pattern of C. Currently, Apache Flink does not support greedy quantifiers as the last variable of a pattern. Since the pattern C is not defined, it is not triggered, and out pattern essentially ends with a greedy search of A+ that is compliant with Flink. See documentation for additional details on current limitations.

Finally, we can display the results of our simulation directly in the Python terminal:

# Viewing the model output
query.execute().print()

# To plot the output
data_sample = []
num_samples = 1
for result in query.execute().collect():
    data_sample.append(result)
    if len(data_sample) >= num_samples:
        break

print(data_sample)

A sample output for the simulation is displayed below in Figure 5.

Figure 5: Table output of Python simulation of MATCH_RECOGNIZE queries on a live simulation of data

Results

Figure 3: Annotated graph of recorded weight values (in grams) versus time of measurement. The green boxes indicate data-points that the algorithm kept, whereas the red boxes denote data-points that were rejected.

By utilizing MATCH_RECOGNIZE, we are able to pinpoint areas where weight measurements are stabilized. This effectively eliminates any need to quantify how much a given product is weighting the checkweigher. Combining these measurements with the number of bags on the line at the time of measurement, we are able to deduce the true yield loss within the process.

Troubleshooting

  1. Issue: Infinite pattern matching

    • Solution: Ensure that not every single data point matches a desired pattern, otherwise no data will be outputted.

  2. Issue: Greedy quantifier is the last element of a pattern

    • Solution: According to Flink, "greedy quantifiers are not allowed as the last element of a Pattern yet.” If you must end your pattern with a greedy quantifier, you can add a undefined “dummy variable” to the end of your pattern. Since the dummy variable is not defined, MATCH_RECOGNIZE will only trigger on the desired pattern(s).

  3. Issue: Watermarks are not assigned correctly.

    • Solution: Verify that the watermark strategy is correctly configured and the timestamps are assigned properly.

  4. Issue: Outliers can still be prevalent

    • Solution: MATCH_RECOGNIZE is not sufficient to identify anomalies within a distribution of data. More work needs to be done to identify and reject outliers, which is a topic covered in our next article!

Summary

Handling large-scale, time-series data is a significant challenge in data processing, especially in manufacturing environments where vast amounts of complex data are generated continuously. Understanding these data is further complicated by the fact that they often contain several sources of unwanted noise and variance. We handle these data frequently, and offer real-time analysis. To provide this analysis live, Ferry implements Apache Flink, a powerful open-source stream processing framework.

Here, we have highlighted some difficulties associated with yield management. Through Flink's MATCH_RECOGNIZE function, we can identify patterns of desired datapoints, enabling the extraction of stable measurements from the checkweigher. Being able to eliminate instabilities in real-time is efficient and paves the way for complex analysis. With improved measurements, we can move forward with quantifying them in a meaningful way using statistics, which is the topic of the next article in this series.