• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ProtonSender类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中io.vertx.proton.ProtonSender的典型用法代码示例。如果您正苦于以下问题:Java ProtonSender类的具体用法?Java ProtonSender怎么用?Java ProtonSender使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ProtonSender类属于io.vertx.proton包,在下文中一共展示了ProtonSender类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: disconnect

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
@Override
public void disconnect() {

    this.vertx.runOnContext(c -> {
        for (ProtonSender sender: this.senders.values()) {
            sender.close();
        }
        for (ProtonReceiver receiver: this.receivers.values()) {
            receiver.close();
        }
        this.senders.clear();
        this.receivers.clear();
        this.connection.close();

        log.info("Disconnected");
    });
}
 
开发者ID:EnMasseProject,项目名称:enmasse-workshop,代码行数:18,代码来源:AmqpClient.java


示例2: send

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
@Override
public void send(String address, byte[] data, Handler<String> sendCompletionHandler) {

    this.vertx.runOnContext(c -> {
        ProtonSender sender = this.senders.get(address);
        if (sender == null) {

            sender = this.connection.createSender(address);
            sender.openHandler(done ->{

                if (done.succeeded()) {
                    this.sendInternal(done.result(), address, data, sendCompletionHandler);
                } else {
                    log.error("Error opening the sender link on {}", address);
                }

            });
            sender.open();
            this.senders.put(address, sender);

        } else {

            this.sendInternal(sender, address, data, sendCompletionHandler);
        }
    });
}
 
开发者ID:EnMasseProject,项目名称:enmasse-workshop,代码行数:27,代码来源:AmqpClient.java


示例3: sendInternal

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
private void sendInternal(ProtonSender sender, String address, byte[] data, Handler<String> sendCompletionHandler) {

        Message msg = ProtonHelper.message();
        msg.setBody(new Data(new Binary(data)));
        msg.setAddress(address);

        if (sender.isOpen()) {

            sender.send(msg, delivery -> {

                if (sendCompletionHandler != null) {
                    sendCompletionHandler.handle(new String(delivery.getTag()));
                }
            });
        }
    }
 
开发者ID:EnMasseProject,项目名称:enmasse-workshop,代码行数:17,代码来源:AmqpClient.java


示例4: onLinkAttach

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
@Override
public final void onLinkAttach(final ProtonConnection con, final ProtonSender sender, final ResourceIdentifier targetResource) {

    if (ProtonQoS.AT_LEAST_ONCE.equals(sender.getRemoteQoS())) {
        HonoUser user = Constants.getClientPrincipal(con);
        sender.setQoS(ProtonQoS.AT_LEAST_ONCE).open();
        logger.debug("transferring token to client...");
        Message tokenMsg = ProtonHelper.message(user.getToken());
        MessageHelper.addProperty(tokenMsg, AuthenticationConstants.APPLICATION_PROPERTY_TYPE, AuthenticationConstants.TYPE_AMQP_JWT);
        sender.send(tokenMsg, disposition -> {
            if (disposition.remotelySettled()) {
                logger.debug("successfully transferred auth token to client");
            } else {
                logger.debug("failed to transfer auth token to client");
            }
            sender.close();
        });
    } else {
        onLinkDetach(sender, ProtonHelper.condition(AmqpError.INVALID_FIELD, "supports AT_LEAST_ONCE delivery mode only"));
    }
}
 
开发者ID:eclipse,项目名称:hono,代码行数:22,代码来源:AuthenticationEndpoint.java


示例5: createSender

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
@Override
public Future<ProtonSender> createSender(
        final ProtonConnection connection,
        final ResourceIdentifier address,
        final ProtonQoS qos,
        final Handler<ProtonSender> sendQueueDrainHandler) {

    Objects.requireNonNull(connection);
    Objects.requireNonNull(address);
    Objects.requireNonNull(qos);

    if (connection.isDisconnected()) {
        return Future.failedFuture("connection is disconnected");
    } else {
        return newSession(connection, address).compose(session -> {
            return newSender(connection, session, address, qos, sendQueueDrainHandler);
        });
    }
}
 
开发者ID:eclipse,项目名称:hono,代码行数:20,代码来源:SenderFactoryImpl.java


示例6: handleFlow

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/**
 * Invoked when a downstream sender receives link credit and/or a drain request from the downstream container.
 * <p>
 * The credits/drain request is forwarded to the corresponding upstream client.
 * 
 * @param replenishedSender The downstream sender that has received the FLOW.
 * @param client The upstream client associated with the sender.
 */
public final void handleFlow(
        final ProtonSender replenishedSender,
        final UpstreamReceiver client) {

    logger.trace("received FLOW from downstream container [con:{}, link: {}, sendQueueFull: {}, credits: {}, queued: {}, drain: {}",
            client.getConnectionId(), client.getLinkId(), replenishedSender.sendQueueFull(), replenishedSender.getCredit(),
            replenishedSender.getQueued(), replenishedSender.getDrain());
    if (replenishedSender.getDrain()) {
        // send drain request upstream and act upon result of request to drain upstream client
        client.drain(10000, drainAttempt -> {
            if (drainAttempt.succeeded()) {
                replenishedSender.drained();
            }
        });
    } else {
        int downstreamCredit = getAvailableDownstreamCredit(replenishedSender);
        client.replenish(downstreamCredit);
        metrics.submitDownstreamLinkCredits(client.getTargetAddress(), downstreamCredit);
    }
}
 
开发者ID:eclipse,项目名称:hono,代码行数:29,代码来源:ForwardingDownstreamAdapter.java


示例7: testHandleFlowForwardsDrainRequestUpstream

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/**
 * Verifies that <em>drain</em> requests received from the downstream container are forwarded
 * to upstream clients.
 */
@SuppressWarnings("unchecked")
@Test
public void testHandleFlowForwardsDrainRequestUpstream() {

    final UpstreamReceiver client = newClient();
    when(client.getTargetAddress()).thenReturn(targetAddress.toString());
    final ProtonSender drainingSender = newMockSender(true);

    // GIVEN an adapter with a connection to the downstream container and a client attached
    givenADownstreamAdapter();
    adapter.setDownstreamConnectionFactory(connectionFactory);
    adapter.start(Future.future());
    adapter.addSender(client, drainingSender);

    // WHEN the downstream sender drains the adapter
    adapter.handleFlow(drainingSender, client);

    // THEN assert that the upstream client has been drained
    verify(client).drain(anyInt(), any(Handler.class));
}
 
开发者ID:eclipse,项目名称:hono,代码行数:25,代码来源:ForwardingDownstreamAdapterTest.java


示例8: testOnClientDisconnectClosesDownstreamSenders

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/**
 * Verifies that corresponding sender links to the downstream container are closed when
 * a connection to an upstream client is lost/closed.
 */
@Test
public void testOnClientDisconnectClosesDownstreamSenders() {

    final String upstreamConnection = "upstream-connection-id";
    final String linkId = "link-id";
    final UpstreamReceiver client = newClient(linkId, upstreamConnection);
    final ProtonSender downstreamSender = newMockSender(false);

    givenADownstreamAdapter(downstreamSender);
    adapter.setDownstreamConnectionFactory(connectionFactory);
    adapter.start(Future.future());
    adapter.addSender(client, downstreamSender);

    // WHEN the upstream client disconnects
    adapter.onClientDisconnect(upstreamConnection);

    // THEN the downstream sender is closed and removed from the sender list
    verify(downstreamSender).close();
    assertTrue(adapter.isActiveSendersEmpty());
    assertTrue(adapter.isSendersPerConnectionEmpty());
}
 
开发者ID:eclipse,项目名称:hono,代码行数:26,代码来源:ForwardingDownstreamAdapterTest.java


示例9: testDownstreamDisconnectClosesUpstreamReceivers

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/**
 * Verifies that all links to upstream clients are closed when the connection to the
 * downstream container is lost.
 */
@Test
public void testDownstreamDisconnectClosesUpstreamReceivers() {

    final UpstreamReceiver client = newClient();
    final ProtonSender downstreamSender = newMockSender(false);
    // expect the connection factory to be invoked twice
    // first on initial connection
    // second on re-connect attempt
    HandlerCapturingConnectionFactory factory = new HandlerCapturingConnectionFactory(con, 2);

    // GIVEN an adapter connected to a downstream container
    givenADownstreamAdapter(downstreamSender);
    adapter.setDownstreamConnectionFactory(factory);
    adapter.start(Future.future());
    adapter.addSender(client, downstreamSender);

    // WHEN the downstream connection fails
    factory.getDisconnectHandler().handle(con);

    // THEN the adapter tries to reconnect to the downstream container and has closed all upstream receivers
    factory.await(1, TimeUnit.SECONDS);
    verify(client).close(any(ErrorCondition.class));
    assertTrue(adapter.isActiveSendersEmpty());
    assertTrue(adapter.isSendersPerConnectionEmpty());
}
 
开发者ID:eclipse,项目名称:hono,代码行数:30,代码来源:ForwardingDownstreamAdapterTest.java


示例10: givenADownstreamAdapter

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
private void givenADownstreamAdapter(final SenderFactory senderFactory) {

        adapter = new ForwardingDownstreamAdapter(vertx, senderFactory) {

            @Override
            protected ProtonQoS getDownstreamQos() {
                return ProtonQoS.AT_MOST_ONCE;
            }

            @Override
            protected void forwardMessage(final ProtonSender sender, final Message msg, final ProtonDelivery delivery) {
                // nothing to do
            }
        };
        adapter.setMetrics(mock(MessagingMetrics.class));
    }
 
开发者ID:eclipse,项目名称:hono,代码行数:17,代码来源:ForwardingDownstreamAdapterTest.java


示例11: newMockSender

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public static ProtonSender newMockSender(final boolean drainFlag) {
    @SuppressWarnings("rawtypes")
    ArgumentCaptor<Handler> drainHandlerCaptor = ArgumentCaptor.forClass(Handler.class);
    Record attachments = mock(Record.class);
    ProtonSender sender = mock(ProtonSender.class);
    when(sender.attachments()).thenReturn(attachments);
    when(sender.isOpen()).thenReturn(Boolean.TRUE);
    when(sender.getCredit()).thenReturn(DEFAULT_CREDITS);
    when(sender.getQueued()).thenReturn(0);
    when(sender.getDrain()).thenReturn(drainFlag);
    when(sender.open()).then(invocation -> {
        drainHandlerCaptor.getValue().handle(sender);
        return sender;
    });
    when(sender.sendQueueDrainHandler(drainHandlerCaptor.capture())).then(invocation -> {
        return sender;
    });
    Target target = new Target();
    target.setAddress(DEFAULT_ADDRESS);
    when(sender.getTarget()).thenReturn(target);
    return sender;
}
 
开发者ID:eclipse,项目名称:hono,代码行数:24,代码来源:TestSupport.java


示例12: getClient

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
private AbstractRequestResponseClient<SimpleRequestResponseResult> getClient(final String tenant, final ProtonSender sender, final ProtonReceiver receiver) {

        return new AbstractRequestResponseClient<SimpleRequestResponseResult>(context, new ClientConfigProperties(), tenant, sender, receiver) {

            @Override
            protected String getName() {
                return "peer";
            }

            @Override
            protected String createMessageId() {
                return MESSAGE_ID;
            }

            @Override
            protected SimpleRequestResponseResult getResult(int status, String payload) {
                return SimpleRequestResponseResult.from(status, payload);
            }
        };
    }
 
开发者ID:eclipse,项目名称:hono,代码行数:21,代码来源:AbstractRequestResponseClientTest.java


示例13: setupAmqpEndpoits

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/**
 * Setup all AMQP endpoints
 */
private void setupAmqpEndpoits() {

    // NOTE : Last Will and Testament Service endpoint is opened only if MQTT client provides will information
    //        The receiver on the unique client publish address will be opened only after
    //        connection is established (and CONNACK sent to the MQTT client)

    // setup and open AMQP endpoint for receiving on unique client control/publish addresses
    ProtonReceiver receiverControl = this.connection.createReceiver(String.format(AmqpReceiverEndpoint.CLIENT_CONTROL_ENDPOINT_TEMPLATE, this.mqttEndpoint.clientIdentifier()));
    ProtonReceiver receiverPublish = this.connection.createReceiver(String.format(AmqpReceiverEndpoint.CLIENT_PUBLISH_ENDPOINT_TEMPLATE, this.mqttEndpoint.clientIdentifier()));
    this.rcvEndpoint = new AmqpReceiverEndpoint(new AmqpReceiver(receiverControl, receiverPublish));

    // setup and open AMQP endpoint to Subscription Service
    ProtonSender ssSender = this.connection.createSender(AmqpSubscriptionServiceEndpoint.SUBSCRIPTION_SERVICE_ENDPOINT);
    this.ssEndpoint = new AmqpSubscriptionServiceEndpoint(ssSender);

    // setup and open AMQP endpoint for publishing
    ProtonSender senderPubrel = this.connection.createSender(String.format(AmqpPublishEndpoint.AMQP_CLIENT_PUBREL_ENDPOINT_TEMPLATE, this.mqttEndpoint.clientIdentifier()));
    this.pubEndpoint = new AmqpPublishEndpoint(senderPubrel);

    this.rcvEndpoint.openControl();
    this.ssEndpoint.open();
    this.pubEndpoint.open();
}
 
开发者ID:EnMasseProject,项目名称:enmasse,代码行数:27,代码来源:AmqpBridge.java


示例14: send

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
public void send(String address, List<Message> messages, long timeout, TimeUnit timeUnit) throws InterruptedException {
    ProtonClient client = ProtonClient.create(vertx);
    CountDownLatch latch = new CountDownLatch(1);
    Queue<Message> messageQueue = new ArrayDeque<>(messages);
    client.connect(host, port, connectEvent -> {
        if (connectEvent.succeeded()) {
            ProtonConnection connection = connectEvent.result();
            connection.open();

            ProtonSender sender = connection.createSender(address);
            sender.openHandler(senderOpenEvent -> {
                if (senderOpenEvent.succeeded()) {
                    sendNext(connection, sender, messageQueue, latch);
                }
            });
            sender.open();
        }
    });
    boolean ok = latch.await(timeout, timeUnit);
    if (!ok) {
        throw new RuntimeException("Sending messages timed out, " + messageQueue.size() + " messages unsent");
    }
}
 
开发者ID:EnMasseProject,项目名称:enmasse,代码行数:24,代码来源:BlockingClient.java


示例15: processOpenSender

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/**
 * Handler for attached link by a remote receiver
 *
 * @param connection	connection which the sender link belong to
 * @param sender		sender link created by the underlying Proton library
 * 						by which handling communication with remote receiver
 */
private void processOpenSender(ProtonConnection connection, ProtonSender sender) {

	log.info("Remote receiver attached {}", sender.getName());
	
	// create and add a new sink to the map
	SinkBridgeEndpoint<?,?> sink = new AmqpSinkBridgeEndpoint<>(this.vertx, this.bridgeConfigProperties);

	sink.closeHandler(s -> {
		this.endpoints.get(connection).getSinks().remove(s);
	});
	sink.open();
	this.endpoints.get(connection).getSinks().add(sink);

	sink.handle(new AmqpEndpoint(sender));
}
 
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:23,代码来源:AmqpBridge.java


示例16: filters_nonIntegerPartitionFilter

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/** When happens when partition filter is not an Integer? */
@Test
public <K, V> void filters_nonIntegerPartitionFilter() throws Exception {
	String topic = "my_topic";
	Vertx vertx = Vertx.vertx();
	AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
	endpoint.open();
	ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
	// Call handle()
	Map<Symbol, Object> filter = new HashMap<>();
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), "not an integer");
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), 10L);
	((Source)mockSender.getRemoteSource()).setFilter(filter);
	endpoint.handle(new AmqpEndpoint(mockSender));
	
	assertDetach(mockSender, 
			AmqpBridge.AMQP_ERROR_WRONG_PARTITION_FILTER,
			"Wrong partition filter");
}
 
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:20,代码来源:AmqpSinkBridgeEndpointMockTest.java


示例17: filters_nonLongOffsetFilter

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/** When happens when offset filter is not a Long? */
@Test
public <K, V> void filters_nonLongOffsetFilter() throws Exception {
	String topic = "my_topic";
	Vertx vertx = Vertx.vertx();
	AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
	endpoint.open();
	ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
	// Call handle()
	Map<Symbol, Object> filter = new HashMap<>();
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), 0);
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), "not a long");
	((Source)mockSender.getRemoteSource()).setFilter(filter);
	endpoint.handle(new AmqpEndpoint(mockSender));
	
	assertDetach(mockSender, 
			// TODO really?
			AmqpBridge.AMQP_ERROR_WRONG_OFFSET_FILTER,
			"Wrong offset filter");
}
 
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:21,代码来源:AmqpSinkBridgeEndpointMockTest.java


示例18: filters_negativeIntegerPartitionFilter

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/** When happens when the partition filter &lt; 0? */
@Test
public <K, V> void filters_negativeIntegerPartitionFilter() throws Exception {
	String topic = "my_topic";
	Vertx vertx = Vertx.vertx();
	AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
	endpoint.open();
	ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
	// Call handle()
	Map<Symbol, Object> filter = new HashMap<>();
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), -1);
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), 10L);
	((Source)mockSender.getRemoteSource()).setFilter(filter);
	endpoint.handle(new AmqpEndpoint(mockSender));
	
	ArgumentCaptor<ErrorCondition> errorCap = ArgumentCaptor.forClass(ErrorCondition.class);
	verify(mockSender).setCondition(errorCap.capture());
	verify(mockSender).close();
	
	assertDetach(mockSender, 
			AmqpBridge.AMQP_ERROR_WRONG_FILTER,
			"Wrong filter");
}
 
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:24,代码来源:AmqpSinkBridgeEndpointMockTest.java


示例19: filters_negativeLongOffsetFilter

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/** When happens when the offset filter is &lt; 0? */
@Test
public <K, V> void filters_negativeLongOffsetFilter() throws Exception {
	String topic = "my_topic";
	Vertx vertx = Vertx.vertx();
	AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
	endpoint.open();
	ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
	// Call handle()
	Map<Symbol, Object> filter = new HashMap<>();
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), 0);
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), -10L);
	((Source)mockSender.getRemoteSource()).setFilter(filter);
	endpoint.handle(new AmqpEndpoint(mockSender));
	
	assertDetach(mockSender, 
			AmqpBridge.AMQP_ERROR_WRONG_FILTER,
			"Wrong filter");
}
 
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:20,代码来源:AmqpSinkBridgeEndpointMockTest.java


示例20: filters_offsetFilterButNoPartitionFilter

import io.vertx.proton.ProtonSender; //导入依赖的package包/类
/** When happens when there's a filter for offset, but not for partition? */
@Test
public <K, V> void filters_offsetFilterButNoPartitionFilter() throws Exception {
	String topic = "my_topic";
	Vertx vertx = Vertx.vertx();
	AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
	endpoint.open();
	ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
	// Call handle()
	Map<Symbol, Object> filter = new HashMap<>();
	//filter.put(Symbol.getSymbol(Bridge.AMQP_PARTITION_FILTER), 0);
	filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), 10L);
	((Source)mockSender.getRemoteSource()).setFilter(filter);
	endpoint.handle(new AmqpEndpoint(mockSender));
	
	assertDetach(mockSender, 
			AmqpBridge.AMQP_ERROR_NO_PARTITION_FILTER,
			"No partition filter specified");
}
 
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:20,代码来源:AmqpSinkBridgeEndpointMockTest.java



注:本文中的io.vertx.proton.ProtonSender类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java IContext类代码示例发布时间:2022-05-22
下一篇:
Java MimeLookup类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap