Multi-IO Stream Support in Spring Cloud Data Flow for Kubernetes

Spring Cloud Data Flow OSS currently only supports streaming pipelines built using applications that have a single input and output. The SCDF Pro server adds support for creating and deploying streaming pipelines using applications that have multiple inputs and outputs (this is referred to as “multi-IO”). Spring Cloud Stream includes built-in support for binding multi-IO applications to the messaging system, and Spring Cloud Data Flow for Kubernetes (SCDF for Kubernetes) can embed multi-IO applications into a streaming pipeline.

This topic describes some highlights of the multi-IO support in SCDF for Kubernetes and answers some frequently-asked questions.

What Are the Multi-IO Features in the SCDF Pro Server?

SCDF Pro adds a number of features to support multi-IO stream applications:

  • A multi-IO stream DSL for constructing streaming pipelines using applications that have multiple inputs and outputs
  • A Stream Flo editor for wiring up multi-IO applications when creating a stream
  • Lifecycle management for multi-IO streams

How Do Multi-IO Applications Differ From Single-IO Applications?

SCDF previously included support for registering a stream application that had a single input and output, where the application input and output endpoints had to be configured as the input and output endpoints using the Spring Cloud Stream @EnableBinding annotation. With the new functional support added to Spring Cloud Stream, as well as configurable function bindings, you can choose named endpoints for the inbound and outbound bindings of your function.

For instance, consider a function that has multiple inputs and a single output:

@Bean
public BiFunction<String, Long, String> process() {

  return (inputString, inputLong) -> {
    // Process inputs and produce output
  };
}

In this example, you could define the following function bindings:

spring.cloud.function.defintion: process
spring.cloud.stream.function.bindings.process-in-0: myInput1
spring.cloud.stream.function.bindings.process-in-1: myInput2
spring.cloud.stream.function.bindings.process-out-0: processedOutput

After defining function bindings as logical names, you can configure the logical names as inbound and outbound ports for your application. Create container image labels with the following key / value pairs:

configuration-properties.outbound-ports/processedOutput
configuration-properties.inbound-ports/myInput1,myInput2

You can use any build tool to apply your label to the container image. This example assumes that you used the Maven Jib plugin to generate the container image. Add the inbound and outbound ports as container image labels, as shown in the following example:

<container>
  <creationTime>USE_CURRENT_TIMESTAMP</creationTime>
  <format>Docker</format>
  <labels>
    <configuration-properties.outbound-ports>processedOutput</configuration-properties.outbound-ports>
    <configuration-properties.inbound-ports>myInput1,myInput2</configuration-properties.inbound-ports>
  </labels>
</container>

After building the application and registering it with SCDF, you can wire this application into a stream.

Ensure that you correctly map the logical names of the function bindings to the inbound and outbound ports. SCDF uses the logical names to wire up the applications in a stream.

Is There a Sample Application Demonstrating Multi-IO Support?

You can use a set of ready-made sample applications to see multi-IO support in action. Download the sample code from the spring-cloud/spring-cloud-dataflow-samples repository on GitHub.

The sample applications demonstrate how to build a streaming pipeline using a KStream processor that has multiple inputs and a single output. The pipeline captures user click events (the number of clicks per user) and user region events (the user region) as HTTP events. The KStream processor application includes a function that joins the user-clicks and user-regions and then computes the number of user clicks per region. Finally, a sink application logs the output of the KStream processor.

Begin by registering the sample applications. Visit the SCDF App Registry and bulk import the following applications:

source.clicks-ingest=docker://springcloudstream/multi-io-sample-http-click-ingest:1.0.0-SNAPSHOT
source.regions-ingest=docker://springcloudstream/multi-io-sample-http-region-ingest:1.0.0-SNAPSHOT
processor.clicks-per-region=docker://springcloudstream/multi-io-sample-user-clicks-per-region-processor:1.0.0-SNAPSHOT
sink.clicks-per-region-logger=docker://springcloudstream/multi-io-sample-log-user-clicks-per-region:1.0.0-SNAPSHOT

After importing the applications, create a streaming pipeline using the Stream Editor:

Multi-IO Streaming Pipeline

You can also deploy a stream created using the multi-IO stream DSL:

clicks-ingest --server.port=9001 || regions-ingest --server.port=9000 || clicks-per-region :userRegions<:regions-ingest.user-regions :userClicks<:clicks-ingest.user-clicks || clicks-per-region-logger :_<:clicks-per-region.clicksPerRegion

clicks-per-region is a KStream application that has two inputs and one output.

To post HTTP events, you may need to either use port forwarding for your pods (if using Minikube) or use the click-ingest and regions-ingest deployments’ Service resource endpoints (if available). This example assumes that the sample applications have been deployed on Minikube and that you are using port forwarding for the associated pods.

Post several HTTP events as user region events:

curl -X POST http://localhost:9000 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Soby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Janne" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: David" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Mark" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Sabby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "asia" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Chris" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Damien" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Christian" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Thomas" -d "americas" -H "Content-Type: text/plain"

You can also post HTTP events as user click events:

curl -X POST http://localhost:9001 -H "username: Glenn" -d 9 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Soby" -d 15 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Janne" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Mark" -d 7 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: David" -d 15 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Sabby" -d 20 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Ilaya" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Chris" -d 5 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Damien" -d 21 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Christian" -d 12 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Thomas" -d 12 -H "Content-Type: text/plain"

After both the user click events and the user region events have been posted, the KStream processor will begin to process the user clicks per region. You can see the result in the logs of the logger application:

2020-08-04 21:53:36.735  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : europe: 139
2020-08-04 21:53:36.751  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : americas: 297
2020-08-04 21:53:36.754  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : asia: 40
2020-08-04 21:53:37.623  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : europe: 172
2020-08-04 21:53:37.625  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : americas: 314

How Does Multi-IO Support Differ From Adding a Tap to a Stream?

When you tap the output of a streaming application in order to create a parallel pipeline, the tap is added to that output of the application. The output acts as a publisher for all pipeline consumers that tap into it, and each of the consumers belongs to a unique consumer group. Tapping works only for the output of a stream application.

With the new multi-IO support, SCDF binds an application that has multiple inputs and outputs. Each input or output has its own functional binding, and SCDF maps the functional binding of the application to a destination at the messaging system.

How Can I Replicate Multi-IO Support in SCDF OSS?

Multi-IO support is currently available only as a feature of the SCDF Pro server and SCDF Pro dashboard. To deploy multi-IO streaming applications using SCDF OSS, you must use the App type and provide manual configuration for the functional bindings of the individual applications.