Multi-IO Stream Support for Spring Cloud Data Flow for Kubernetes

This topic contains some highlights and FAQs of the multi-IO support for Spring Cloud Data Flow for Kubernetes.

Multi-IO Stream Support

Spring Cloud Data Flow OSS currently supports the creation of streaming pipeline with applications that have only single input and output. The SCDF Pro server now adds support for creating and deploying streaming applications that have multiple inputs and outputs (multi-IO). With Spring Cloud Stream having built-in support for binding multi-IO to the messaging system, Spring Cloud Data Flow lets multi-IO applications be embedded into a streaming pipeline.

What are the features of multi-IO support in SCDF Pro server?

  • Add multi-IO Stream DSL to construct streaming pipeline by using applications with multiple inputs and outputs
  • Stream Flo editor for wiring up multi-IO applications when creating the stream
  • Multi-IO Stream lifecycle management

How different are the multi-IO applications from the applications with single input and output?

So far, SCDF has been letting applications with a single input and output be registered with Spring Cloud Data Flow. The application needs to have its input and output endpoints configured to be input and output, respectively, by using Spring Cloud Stream framework’s @EnableBinding annotation (which provides input and output endpoints named input and output). With the new functional support in Spring Cloud Stream and with the ability to configure the function bindings, you can choose named endpoints for your function’s inbound and outbound bindings.

For instance, consider a function that receives multiple inputs and a single output:

@Bean
public BiFunction<String, Long, String> process() {

  return (inputString, inputLong) -> {
    // process inputs and produce output
  };
}

For the preceding example, you could define the function bindings as:

spring.cloud.function.defintion: process
spring.cloud.stream.function.bindings.process-in-0: myInput1
spring.cloud.stream.function.bindings.process-in-1: myInput2
spring.cloud.stream.function.bindings.process-out-0: processedOutput

Once you have your function bindings defined as logical names, you can configure these logical names as inbound and outbound ports for your application, as follows, by creating a container image labels with the following key-value pairs:

configuration-properties.outbound-ports/processedOutput
configuration-properties.inbound-ports/myInput1,myInput2

You can use any build tool to apply your label to the container image. I chose to use the Maven jib plugin to generate my container image and, as a result, I have added the inbound and outbound ports as container image labels, as follows:

<container>
  <creationTime>USE_CURRENT_TIMESTAMP</creationTime>
  <format>Docker</format>
  <labels>
    <configuration-properties.outbound-ports>processedOutput</configuration-properties.outbound-ports>
    <configuration-properties.inbound-ports>myInput1,myInput2</configuration-properties.inbound-ports>
  </labels>
</container>

Once built and registered with Spring Cloud Data Flow, this application is ready for wiring up into a Stream.

It is important to map the logical names of the function bindings into the inbound and outbound port mapping correctly, as those logical names are the ones used by SCDF to wire up the applications in a Stream. We plan to add some automation and validation to map these logical inbound and outbound names from the function bindings (instead of manual entries of these properties) in upcoming releases.

Do we have a sample application that demonstrates multi-IO support?

We have a set of sample applications created and ready for you to use to see the multi-IO support in action.

In this sample, you can see how to use a KStream processor (with multiple inputs and single output) in a Streaming pipeline.

This streaming pipeline captures user click events (number of clicks per user) and user region events (user region) as HTTP events.

The KStream processor adds a function that joins both the user-clicks and user-regions and computes the number of user clicks per region.

There is a sample sink application that logs the result of the KStream processor’s output.

Register the applications:

Go to App Registry and bulk import the following:

source.clicks-ingest=docker://springcloudstream/multi-io-sample-http-click-ingest:1.0.0-SNAPSHOT
source.regions-ingest=docker://springcloudstream/multi-io-sample-http-region-ingest:1.0.0-SNAPSHOT
processor.clicks-per-region=docker://springcloudstream/multi-io-sample-user-clicks-per-region-processor:1.0.0-SNAPSHOT
sink.clicks-per-region-logger=docker://springcloudstream/multi-io-sample-log-user-clicks-per-region:1.0.0-SNAPSHOT

Once added, create a streaming pipeline by using the Stream Editor, as follows:

Multi-IO Streaming Pipeline

You can also create a stream by using the following DSL and deploy the stream:

clicks-ingest --server.port=9001 || regions-ingest --server.port=9000 || clicks-per-region :userRegions<:regions-ingest.user-regions :userClicks<:clicks-ingest.user-clicks || clicks-per-region-logger :_<:clicks-per-region.clicksPerRegion

As you can see the clicks-per-region is a KStream application with two inputs and one output.

In order to post the HTTP events, you might either need to port-forward your pods (on minikube) or use the service endpoints(if you have a Service endpoint available) of click-ingest and regions-ingest deployments. Since this sample is now set up on minikube, the following sample data is posted to http://localhost:9000 and http://localhost:9001 after port-forwarding the associated pods.

Post some HTTP events as user region events:

curl -X POST http://localhost:9000 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Soby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Janne" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: David" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Mark" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Sabby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "asia" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Chris" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Damien" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Christian" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Thomas" -d "americas" -H "Content-Type: text/plain"

We can also post some HTTP events as user click events:

curl -X POST http://localhost:9001 -H "username: Glenn" -d 9 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Soby" -d 15 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Janne" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Mark" -d 7 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: David" -d 15 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Sabby" -d 20 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Ilaya" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Chris" -d 5 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Damien" -d 21 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Christian" -d 12 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Thomas" -d 12 -H "Content-Type: text/plain"

Once both the user clicks and the user region events are posted, the KStream processor will start processing the user clicks per region and you can see the result at the logger application’s log:

2020-08-04 21:53:36.735  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : europe: 139
2020-08-04 21:53:36.751  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : americas: 297
2020-08-04 21:53:36.754  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : asia: 40
2020-08-04 21:53:37.623  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : europe: 172
2020-08-04 21:53:37.625  INFO 1 --- [container-0-C-1] user-clicks-per-region                   : americas: 314

The code for this sample is located in a user-clicks-per-region GitHub repository

How does the multi-IO support differ from adding a tap to a stream?

When an output of a streaming application is tapped to create a parallel pipeline, the tap is added to the same output of the application. The output acts as a publisher for all the pipeline consumers tapped into it. Each consumer of the output also belongs to a unique consumer group. Also, you can tap into a stream application’s output only. In the case of multi-IO support, Spring Cloud Data Flow binds an application that has multiple inputs and outputs. Each input or output has its own functional binding and Spring Cloud Data Flow configures the application’s functional binding to a destination (at the messaging system).

How can I achieve multi-IO in SCDF OSS?

Currently, the multi-IO support is added as a feature only to the Pro server and the Pro Dashboard. To deploy the multi-IO-based streaming applications with SCDF OSS, you have to use the App type and provide the manual configuration of the individual applications’ functional bindings.