本文整理汇总了Java中org.springframework.integration.dsl.SourcePollingChannelAdapterSpec类的典型用法代码示例。如果您正苦于以下问题:Java SourcePollingChannelAdapterSpec类的具体用法?Java SourcePollingChannelAdapterSpec怎么用?Java SourcePollingChannelAdapterSpec使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
SourcePollingChannelAdapterSpec类属于org.springframework.integration.dsl包,在下文中一共展示了SourcePollingChannelAdapterSpec类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: pollingFlow
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(poller);
}
});
if (this.properties.isSplit()) {
flowBuilder.split();
}
flowBuilder.channel(this.source.output());
return flowBuilder.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:18,代码来源:JdbcSourceConfiguration.java
示例2: consumer
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
@Bean
IntegrationFlow consumer() {
log.info("starting consumer..");
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
.consumerProperties(props ->
props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
.maxMessages(10)
.valueDecoder(String::new));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<String>>>handle((payload, headers) -> {
payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue()));
return null;
})
.get();
}
开发者ID:joshlong,项目名称:spring-and-kafka,代码行数:26,代码来源:DemoApplication.java
示例3: ftpInboundFlow
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
@Bean
public IntegrationFlow ftpInboundFlow(SessionFactory<FTPFile> ftpSessionFactory, FtpSourceProperties properties,
FileConsumerProperties fileConsumerProperties) {
FtpInboundChannelAdapterSpec messageSourceBuilder = Ftp.inboundAdapter(ftpSessionFactory)
.preserveTimestamp(properties.isPreserveTimestamp())
.remoteDirectory(properties.getRemoteDir())
.remoteFileSeparator(properties.getRemoteFileSeparator())
.localDirectory(properties.getLocalDir())
.autoCreateLocalDirectory(properties.isAutoCreateLocalDir())
.temporaryFileSuffix(properties.getTmpFileSuffix())
.deleteRemoteFiles(properties.isDeleteRemoteFiles());
if (StringUtils.hasText(properties.getFilenamePattern())) {
messageSourceBuilder.filter(new FtpSimplePatternFileListFilter(properties.getFilenamePattern()));
}
else if (properties.getFilenameRegex() != null) {
messageSourceBuilder
.filter(new FtpRegexPatternFileListFilter(properties.getFilenameRegex()));
}
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceBuilder
, new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec
.poller(FtpSourceConfiguration.this.defaultPoller);
}
});
return FileUtils.enhanceFlowForReadingMode(flowBuilder, fileConsumerProperties)
.channel(this.source.output())
.get();
}
开发者ID:spring-cloud-stream-app-starters,项目名称:ftp,代码行数:36,代码来源:FtpSourceConfiguration.java
示例4: sftpInboundFlow
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
@Bean
public IntegrationFlow sftpInboundFlow(SessionFactory<LsEntry> sftpSessionFactory, SftpSourceProperties properties,
FileConsumerProperties fileConsumerProperties) {
SftpInboundChannelAdapterSpec messageSourceBuilder = Sftp.inboundAdapter(sftpSessionFactory)
.preserveTimestamp(properties.isPreserveTimestamp())
.remoteDirectory(properties.getRemoteDir())
.remoteFileSeparator(properties.getRemoteFileSeparator())
.localDirectory(properties.getLocalDir())
.autoCreateLocalDirectory(properties.isAutoCreateLocalDir())
.temporaryFileSuffix(properties.getTmpFileSuffix())
.deleteRemoteFiles(properties.isDeleteRemoteFiles());
if (StringUtils.hasText(properties.getFilenamePattern())) {
messageSourceBuilder.filter(new SftpSimplePatternFileListFilter(properties.getFilenamePattern()));
}
else if (properties.getFilenameRegex() != null) {
messageSourceBuilder
.filter(new SftpRegexPatternFileListFilter(properties.getFilenameRegex()));
}
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceBuilder
, new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec
.poller(SftpSourceConfiguration.this.defaultPoller);
}
});
return FileUtils.enhanceFlowForReadingMode(flowBuilder, fileConsumerProperties)
.channel(this.source.output())
.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:36,代码来源:SftpSourceConfiguration.java
示例5: fileSourceFlow
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
@Bean
public IntegrationFlow fileSourceFlow() {
FileInboundChannelAdapterSpec messageSourceSpec = Files.inboundAdapter(new File(this.properties.getDirectory()));
if (StringUtils.hasText(this.properties.getFilenamePattern())) {
messageSourceSpec.patternFilter(this.properties.getFilenamePattern());
} else if (this.properties.getFilenameRegex() != null) {
messageSourceSpec.regexFilter(this.properties.getFilenameRegex().pattern());
}
if (this.properties.isPreventDuplicates()) {
messageSourceSpec.preventDuplicates();
}
IntegrationFlowBuilder flowBuilder = IntegrationFlows
.from(messageSourceSpec,
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec
.poller(defaultPoller);
}
});
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties)
.channel(source.output())
.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:30,代码来源:FileSourceConfiguration.java
示例6: consumer
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
@Bean
IntegrationFlow consumer() {
log.info("starting consumer..");
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka
.inboundChannelAdapter(
new ZookeeperConnect(this.kafkaConfig
.getZookeeperAddress()))
.consumerProperties(
props -> props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer(
"myGroup",
metadata -> metadata
.consumerTimeout(100)
.topicStreamMap(
m -> m.put(this.kafkaConfig.getTopic(),
1)).maxMessages(1)
.valueDecoder(String::new));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, ConcurrentHashMap<String, String>>> handle(
(payload, headers) -> {
payload.entrySet().forEach(
e -> orderEntryService.createOrderEntryFromJson(e.getValue()));
return null;
}).get();
}
开发者ID:codecentric,项目名称:event-based-shopping-system,代码行数:33,代码来源:CommoditiesReservationConsumerConfiguration.java
示例7: accept
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
@Override
public void accept(SourcePollingChannelAdapterSpec spec) {
spec.poller(Pollers.fixedRate(500));
}
开发者ID:vikrammane23,项目名称:https-github.com-g0t4-jenkins2-course-spring-boot,代码行数:5,代码来源:SampleParentContextApplication.java
示例8: getFlowBuilder
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; //导入依赖的package包/类
/**
* Method to build Integration Flow for Mail. Suppress Warnings for
* MailInboundChannelAdapterSpec.
* @return Integration Flow object for Mail Source
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private IntegrationFlowBuilder getFlowBuilder() {
IntegrationFlowBuilder flowBuilder;
URLName urlName = this.properties.getUrl();
if (this.properties.isIdleImap()) {
flowBuilder = getIdleImapFlow(urlName);
}
else {
MailInboundChannelAdapterSpec adapterSpec;
switch (urlName.getProtocol().toUpperCase()) {
case "IMAP":
case "IMAPS":
adapterSpec = getImapFlowBuilder(urlName);
break;
case "POP3":
case "POP3S":
adapterSpec = getPop3FlowBuilder(urlName);
break;
default:
throw new IllegalArgumentException(
"Unsupported mail protocol: " + urlName.getProtocol());
}
flowBuilder = IntegrationFlows.from(
adapterSpec.javaMailProperties(getJavaMailProperties(urlName))
.selectorExpression(this.properties.getExpression())
.shouldDeleteMessages(this.properties.isDelete()),
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(
SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(MailSourceConfiguration.this.defaultPoller);
}
});
}
return flowBuilder;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:48,代码来源:MailSourceConfiguration.java
注:本文中的org.springframework.integration.dsl.SourcePollingChannelAdapterSpec类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论