该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

事件路由

在春季云流的语境下,事件路由是指能够将事件路由到特定事件订阅者,或 b)将事件订阅者产生的事件路由到特定目的地的能力。 这里我们称之为路径“TO”和路由“FROM”。spring-doc.cadn.net.cn

路由到消费者

路由可以通过依赖路由函数可在Spring Cloud Function 3.0中提供。你只需要通过以下方式启用它--spring.cloud.stream.function.routing.enabled=true应用属性或提供Spring.cloud.function.routing-expression财产。 启用后路由函数将绑定到输入目的地 接收所有消息,并根据提供的指令将其路由到其他功能。spring-doc.cadn.net.cn

为了绑定,路由目的的名称为functionRouter-in-0(参见RoutingFunction.FUNCTION_NAME和绑定命名规范功能绑定名称)。

指令可以通过单独的消息和应用程序属性来提供。spring-doc.cadn.net.cn

以下是几个示例:spring-doc.cadn.net.cn

使用消息头

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过向functionRouter-in-0目的地由活页夹揭示(如兔子、卡夫卡), 该消息将被路由到相应的(“偶数”或“奇数”)消费者。spring-doc.cadn.net.cn

默认情况下路由函数我会寻找spring.cloud.function.definitionSpring.cloud.function.routing-expression(用于更动态的特殊语言场景) 如果找到了,其值将被视为路由指令。spring-doc.cadn.net.cn

例如 设置Spring.cloud.function.routing-expression头到值T(java.lang.System).currentTimeMillis() % 2 == 0 ?“偶数”:“奇数”最终会半随机地将请求路由到以下任一奇怪甚至功能。 此外,对于 SpEL,评估上下文的根对象消息所以你也可以对单个头部(或消息)做评估…​.routing-expression=headers['type']spring-doc.cadn.net.cn

应用属性的使用

Spring.cloud.function.routing-expression和/或spring.cloud.function.definition可以传递为应用程序属性(例如,Spring.cloud.function.routing-expression=headers['type'].spring-doc.cadn.net.cn

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
通过应用属性传递指令对于响应式函数尤为重要,因为响应式函数 该功能仅调用一次以传递出版商,因此对单个项目的访问受限。

路由函数与输出绑定

路由函数功能因此,对待与其他功能无异。井。。。几乎。spring-doc.cadn.net.cn

什么时候路由函数通往另一条路线功能其输出被发送到输出绑定中路由函数哪 是functionRouter-in-0不出所料。但如果路由函数通往A的路线消费者?换句话说,就是祈祷的结果 关于路由函数可能无法产生任何内容发送到输出绑定,因此甚至需要有绑定。 所以,我们确实请客路由函数制作绑定时的方式有些不同。即使对你这个用户来说是透明的 (其实你没什么可做的),了解一些机制会帮助你理解它的内部运作。spring-doc.cadn.net.cn

所以,规则是: 我们从不为路由函数,只有输入。所以当你路由到时消费者路由函数有效 变为消费者没有任何输出绑定。然而,如果路由函数恰好转接到另一个功能该 输出,输出绑定路由函数将动态生成,何时路由函数将作为常驻演员功能关于绑定(同时具有输入和输出绑定)。spring-doc.cadn.net.cn

来自消费者的路由

除了静态目的地外,Spring Cloud Stream 还允许应用程序向动态绑定的目的地发送消息。 例如,当需要在运行时确定目标目的地时,这非常有用。 应用程序可以通过两种方式实现。spring-doc.cadn.net.cn

Spring.cloud.stream.sendto.destination

你也可以委派给框架,通过指定动态解析输出目的地Spring.cloud.stream.sendto.destination页眉 设置为待解决的目的地名称。spring-doc.cadn.net.cn

请考虑以下例子:spring-doc.cadn.net.cn

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

虽然很简单,但你可以清楚地看到,我们的输出是一个消息,其中Spring.cloud.stream.sendto.destination页眉 设置为输入参数的值。框架会参考该标题,并尝试创建或发现 一个名为该目的的目的地,并向其发送输出。spring-doc.cadn.net.cn

如果目的地名称提前已知,你可以像配置其他目的地一样配置生产者属性。 或者,如果你注册了NewDestinationBindingCallback<>Bean,它是在绑定形成前被调用的。 回调采用结合剂所使用的扩展生产者特性的通用类型。 它有一种方法:spring-doc.cadn.net.cn

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例展示了如何使用RabbitMQ绑定器:spring-doc.cadn.net.cn

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果你需要支持多种活发器类型的动态目的地,可以使用对象对于一般类型,铸造扩展必要时争论。

此外,请参阅“使用 StreamBridge”部分,了解如何在类似情况下使用另一种选项。spring-doc.cadn.net.cn