跳到主要内容

IntegrationFlowAdapter

QWen Plus 中英对照 IntegrationFlowAdapter IntegrationFlowAdapter

IntegrationFlow 接口可以被直接实现并指定为一个组件进行扫描,如下例所示:

@Component
public class MyFlow implements IntegrationFlow {

@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}

}
java

它被 IntegrationFlowBeanPostProcessor 捕获,并正确解析和注册到应用程序上下文中。

为了方便并获得松耦合架构的好处,我们提供了 IntegrationFlowAdapter 基类实现。它需要实现 buildFlow() 方法来通过使用其中一个 from() 方法生成一个 IntegrationFlowDefinition,如下例所示:

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

private final AtomicBoolean invoked = new AtomicBoolean();

public Instant nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : Instant.now();
}

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return fromSupplier(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(this)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}

public String messageSource() {
return "T,H,I,N,G,2";
}

@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}

@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}

@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}

@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}

@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}

}
java