• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java StreamCallback类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.wso2.siddhi.core.stream.output.StreamCallback的典型用法代码示例。如果您正苦于以下问题:Java StreamCallback类的具体用法?Java StreamCallback怎么用?Java StreamCallback使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



StreamCallback类属于org.wso2.siddhi.core.stream.output包,在下文中一共展示了StreamCallback类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, SiddhiParserException {

        // Create Siddhi Manager
        SiddhiManager siddhiManager = new SiddhiManager();

        siddhiManager.defineStream("define stream cseEventStream ( symbol string, price float, volume int )");
        siddhiManager.addQuery("from  cseEventStream [ price >= 50 ] " +
                               "select symbol, price "+
                               "insert into StockQuote ;");

        siddhiManager.addCallback("StockQuote", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                EventPrinter.print(events);
            }
        });
        InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"GOOG", 50f, 100});
        inputHandler.send(new Object[]{"IBM", 76.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 45.6f, 100});
        Thread.sleep(500);

        siddhiManager.shutdown();
    }
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:26,代码来源:SimpleFilterSample.java


示例2: getCallback

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
/**
 * Gets the callback that Siddhi will use to output messages
 *
 * @param streamName The stream that produced the message
 * @param topic The destination topic for that message
 * @param attributes Attributes from the message, which stores both attributes names and values.
 * @return A StreamCallback that Siddhi will use to send events
 */

public StreamCallback getCallback(final String streamName, final String topic, final List<Attribute> attributes) {
    return new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            for (Event event : events) {
                // This map will store the final message
                Map<String, Object> result = new HashMap<>();

                // Get all the attributes values from the list of attributes
                int index = 0;
                for (Object object : event.getData()) {
                    String columnName = attributes.get(index++).getName();
                    result.put(columnName, object);
                }

                // Send the message to every sink
                sinksManager.process(streamName, topic, result);
            }
        }
    };
}
 
开发者ID:redBorder,项目名称:cep,代码行数:31,代码来源:SiddhiCallback.java


示例3: start

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
/**
 * Starts the siddhi execution plan
 *
 * @param siddhiManager The manager that will manage the execution plan
 * @param siddhiCallback The callback to be called when the execution plan creates a new message
 * @throws ExecutionPlanException
 */

public void start(SiddhiManager siddhiManager, SiddhiCallback siddhiCallback) throws ExecutionPlanException {
    this.executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(getFullExecutionPlan());

    for (Map.Entry<String, String> entry : outputTopics.entrySet()) {
        String streamName = entry.getKey();
        String topic = entry.getValue();

        AbstractDefinition abstractDefinition = executionPlanRuntime.getStreamDefinitionMap().get(streamName);
        if (abstractDefinition != null) {
            List<Attribute> attributes = abstractDefinition.getAttributeList();
            StreamCallback streamCallback = siddhiCallback.getCallback(streamName, topic, attributes);
            executionPlanRuntime.addCallback(streamName, streamCallback);
        } else {
            throw new InvalidExecutionPlanException("You specified a output that is not present on the execution plan");
        }
    }

    executionPlanRuntime.start();
    log.info("Started execution plan with id {} version {}", id, version, fullExecutionPlan);
}
 
开发者ID:redBorder,项目名称:cep,代码行数:29,代码来源:SiddhiPlan.java


示例4: createTestExecutionRuntime

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
private SiddhiAppRuntime createTestExecutionRuntime() {
    siddhiManager = new SiddhiManager();
    String siddhiApp = "" +
            "@app:name('callbackTest1') " +
            "" +
            "@async(buffer.size='2')" +
            "define stream StockStream (symbol string, price float, volume long);" +
            "" +
            "@info(name = 'query1') " +
            "@Parallel " +
            "from StockStream[price + 0.0 > 0.0] " +
            "select symbol, price " +
            "insert into outputStream;";

    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
    siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            EventPrinter.print(events);
            count.addAndGet(events.length);
            eventArrived = true;
        }
    });
    return siddhiAppRuntime;
}
 
开发者ID:wso2,项目名称:siddhi,代码行数:26,代码来源:ExceptionHandlerTestCase.java


示例5: distinctFilterTest

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void distinctFilterTest() throws InterruptedException {
    sm.defineStream("define stream testStream (c1 string, c2 float, c3 int);");
    sm.addQuery("from testStream #window.stratio:distinct(c1) select c1, c2,c3 insert into resultStream;");

    sm.addCallback("resultStream", new StreamCallback() {

        @Override
        public void receive(Event[] events) {
            for (Event event : events) {
                if (event instanceof InEvent) {
                    count.getAndIncrement();
                }
            }
        }
    });
    sm.getInputHandler("testStream").send(new Object[] { new String("KEY_A"), new Float(10), new Integer(20) });
    sm.getInputHandler("testStream").send(new Object[] { new String("KEY_A"), new Float(20), new Integer(30) });
    sm.getInputHandler("testStream").send(new Object[] { new String("KEY_A"), new Float(30), new Integer(40) });
    sm.getInputHandler("testStream").send(new Object[] { new String("KEY_B"), new Float(30), new Integer(40) });

    Thread.sleep(500);
    assertEquals(2, count.get());
}
 
开发者ID:Stratio,项目名称:Decision,代码行数:25,代码来源:DistinctWindowTest.java


示例6: main

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
    SiddhiManager siddhiManager = new SiddhiManager();

    String cseEventStream = "define stream cseEventStream (symbol string, price float, volume long);";
    String query = "from cseEventStream[70 > price] select symbol,price,volume insert into outputStream ;";
    siddhiManager.defineStream(cseEventStream);
    siddhiManager.addQuery(query);
    siddhiManager.addCallback("outputStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            count++;
            if (count % 10000000 == 0) {
                long end = System.currentTimeMillis();
                double tp = (10000000 * 1000.0 / (end - start));
                System.out.println("Throughput = " + tp + " Event/sec");
                start = end;
            }
        }
    });

    InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
    while (true) {
        inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
    }
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:35,代码来源:SimpleFilterSingleQueryPerformance.java


示例7: main

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
    SiddhiManager siddhiManager = new SiddhiManager();

    String cseEventStream = " define stream cseEventStream (symbol string, price float, volume int);";
    String query1 = "from cseEventStream[70 > price] select symbol,price,volume insert into outputStream ;";
    String query2 = "from cseEventStream[volume > 90] select symbol,price,volume insert into outputStream ;";

    siddhiManager.defineStream(cseEventStream);
    siddhiManager.addQuery(query1);
    siddhiManager.addQuery(query2);
    siddhiManager.addCallback("outputStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            count++;
            if (count % 1000000 == 0) {
                long end = System.currentTimeMillis();
                double tp = (1000000 * 1000.0 / (end - start));
                System.out.println("Throughput = " + tp + " Event/sec");
                start = end;
            }
        }
    });

    InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
    while (true) {
        inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 100f, 80});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 100f, 80});
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
    }
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:36,代码来源:SimpleFilterMultipleQueryPerformance.java


示例8: testQuery1

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
    public void testQuery1() throws InterruptedException {

        log.info("StreamCallback test1");
        SiddhiManager siddhiManager = new SiddhiManager();

        InputHandler inputHandler = siddhiManager.defineStream(QueryFactory.createStreamDefinition().name("cseEventStream").attribute("symbol", Attribute.Type.STRING).attribute("price", Attribute.Type.FLOAT).attribute("volume", Attribute.Type.INT));

        Query query = QueryFactory.createQuery();
        query.from(QueryFactory.inputStream("cseEventStream"));
        query.select(
                QueryFactory.outputSelector().
                        select("symbol", Expression.variable("symbol")).
                        select("price", Expression.variable("price")).
                        select("volume", Expression.variable("volume"))
        );
        query.insertInto("StockQuote");

        String queryReference = siddhiManager.addQuery(query);

        siddhiManager.addCallback("StockQuote", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                EventPrinter.print(events);
                Assert.assertTrue("IBM".equals(events[0].getData(0)) || "WSO2".equals(events[0].getData(0)));
                count++;
            }
        });
//        InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
        Thread.sleep(500);
        siddhiManager.shutdown();
        Assert.assertEquals(2, count);

    }
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:37,代码来源:CallbackTestCase.java


示例9: testTimeBatchAndSequence

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testTimeBatchAndSequence() throws Exception {
    log.info("testTimeBatchAndSequence  OUT 1");
    SiddhiManager siddhi = new SiddhiManager();
    InputHandler i1 = siddhi.defineStream("define stream received_reclamations (timestamp long, product_id string, defect_category string)");

    siddhi.addQuery("from received_reclamations#window.timeBatch(1 sec) " +
            "select product_id, defect_category, count(*) as num group by product_id, defect_category " +
            "insert into reclamation_averages");

    final CountDownLatch increaseEventReceived = new CountDownLatch(1);
    siddhi.addQuery("from a=reclamation_averages[num > 1], b=reclamation_averages[num > a.num and product_id == a.product_id and defect_category == a.defect_category] " +
            "select a.product_id, a.defect_category, a.num as oldNum, b.num as newNum " +
            "insert into increased_reclamations");
    siddhi.addCallback("increased_reclamations", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            EventPrinter.print(events);
            increaseEventReceived.countDown();
        }
    });

    for (int i = 0; i < 5; i++) {
        i1.send(new Object[]{System.currentTimeMillis(), "abc", "123"});
        Thread.sleep(1);
    }

    Thread.sleep(1000);

    for (int i = 0; i < 8; i++) {
        i1.send(new Object[] { System.currentTimeMillis(), "abc", "123" });
        Thread.sleep(1);
    }

    assertTrue("Did not receive event in stream increased_reclamations", increaseEventReceived.await(1000, TimeUnit.MILLISECONDS));

    siddhi.shutdown();
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:39,代码来源:SequenceTestCase.java


示例10: testSnapshotOutputRateLimitQuery6

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testSnapshotOutputRateLimitQuery6() throws InterruptedException {
    log.info("SnapshotOutputRateLimit test6");

    SiddhiManager siddhiManager = new SiddhiManager();


    siddhiManager.defineStream("define stream LoginEvents (timeStamp long, ip string) ");

    String queryReference = siddhiManager.addQuery("from LoginEvents#window.time(1 sec) " +
                                                   "select  ip " +
                                                   "output snapshot every 1 sec " +
                                                   "insert into uniqueIps for all-events ;");

    siddhiManager.addCallback("uniqueIps", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            EventPrinter.print(events);
            count++;
            eventArrived = true;
        }
    });
    InputHandler loginSucceedEvents = siddhiManager.getInputHandler("LoginEvents");

    Thread.sleep(100);
    loginSucceedEvents.send(new Object[]{System.currentTimeMillis(), "192.10.1.5"});
    loginSucceedEvents.send(new Object[]{System.currentTimeMillis(), "192.10.1.3"});
    Thread.sleep(2200);

    Assert.assertEquals("Event arrived", true, eventArrived);
    Assert.assertEquals("Number of output event value", 1, count);
    siddhiManager.shutdown();
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:34,代码来源:SnapshotOutputRateLimitTestCase.java


示例11: testQuery1

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
    public void testQuery1() throws InterruptedException, SiddhiParserException {
        log.info("Remove Query test1");
        SiddhiManager siddhiManager = new SiddhiManager();

        InputHandler inputHandler = siddhiManager.defineStream("define stream cseStream ( symbol string, price float, volume int )");

        siddhiManager.addCallback("outStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                EventPrinter.print(events);
                Assert.assertTrue("IBM".equals(events[0].getData(0)) || "WSO2".equals(events[0].getData(0)));
                count++;
            }
        });

        String queryReference = siddhiManager.addQuery("from cseStream[price>10] " +
                                                       "select symbol, price, volume " +
                                                       " having price*12 >100 " +
                                                       "insert into outStream ;");

        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 75.6f, 100});


        siddhiManager.removeQuery(queryReference);
//        InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
        inputHandler.send(new Object[]{"IBM", 75.6f, 100});
        inputHandler.send(new Object[]{"WSO2", 75.6f, 100});

        siddhiManager.shutdown();

        Assert.assertEquals(2, count);

    }
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:36,代码来源:AddRemoveTestCase.java


示例12: testSimplePlan

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testSimplePlan() throws InterruptedException {
	SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
		"define stream inStream (name string, value double);"
			+ "from inStream insert into outStream");
	runtime.start();

	final List<Object[]> received = new ArrayList<>(3);
	InputHandler inputHandler = runtime.getInputHandler("inStream");
	Assert.assertNotNull(inputHandler);

	try {
		runtime.getInputHandler("unknownStream");
		Assert.fail("Should throw exception for getting input handler for unknown streamId.");
	} catch (Exception ex) {
		// Expected exception for getting input handler for illegal streamId.
	}

	runtime.addCallback("outStream", new StreamCallback() {
		@Override
		public void receive(Event[] events) {
			for (Event event : events) {
				received.add(event.getData());
			}
		}
	});

	inputHandler.send(new Object[]{"a", 1.1});
	inputHandler.send(new Object[]{"b", 1.2});
	inputHandler.send(new Object[]{"c", 1.3});
	Thread.sleep(100);
	Assert.assertEquals(3, received.size());
	Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1});
	Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2});
	Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3});
}
 
开发者ID:haoch,项目名称:flink-siddhi,代码行数:37,代码来源:SiddhiSyntaxTest.java


示例13: testSimplePlan

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testSimplePlan() throws InterruptedException {
    SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
        "define stream inStream (name string, value double);"
            + "from inStream insert into outStream");
    runtime.start();

    final List<Object[]> received = new ArrayList<>(3);
    InputHandler inputHandler = runtime.getInputHandler("inStream");
    Assert.assertNotNull(inputHandler);

    try {
        runtime.getInputHandler("unknownStream");
        Assert.fail("Should throw exception for getting input handler for unknown streamId.");
    } catch (Exception ex) {
        // Expected exception for getting input handler for illegal streamId.
    }

    runtime.addCallback("outStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            for (Event event : events) {
                received.add(event.getData());
            }
        }
    });

    inputHandler.send(new Object[]{"a", 1.1});
    inputHandler.send(new Object[]{"b", 1.2});
    inputHandler.send(new Object[]{"c", 1.3});
    Thread.sleep(100);
    Assert.assertEquals(3, received.size());
    Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1});
    Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2});
    Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3});
}
 
开发者ID:apache,项目名称:bahir-flink,代码行数:37,代码来源:SiddhiSyntaxTest.java


示例14: testFraudMoreThanOneOrderIn10MinutesWithDifferentCards

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testFraudMoreThanOneOrderIn10MinutesWithDifferentCards() throws Exception {
    count.set(0);
    LOGGER.debug("[Fraud] Checking if there are more than one order in 10 minutes with different credit cards");
    LOGGER.debug("--> Creating Order Queries and Loading dataset");
    //createOrdersQueries();
    ORDER_QUERY3_ID= sm.addQuery(OrdersQueries.QUERY_MORE_1_ORDER_IN_10M);

    sm.addCallback(OrdersQueries.STREAM_FRAUD, new StreamCallback() {

        @Override
        public void receive(Event[] events) {
            for (Event event : events) {
                if (event instanceof InEvent && event.getData(6).toString().equals("orders-3")) {
                    count.getAndIncrement();
                    LOGGER.debug("Found event: " + event.toString());
                }
            }
        }
    });

    loadOrders(DATASET_ORDERS2);

    Thread.sleep(500);
    assertEquals(2, count.get());
    sm.removeQuery(ORDER_QUERY3_ID);
}
 
开发者ID:Stratio,项目名称:Decision,代码行数:28,代码来源:OrdersQueriesValidationTest.java


示例15: callbackTest3

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test(expectedExceptions = DefinitionNotExistException.class)
public void callbackTest3() throws InterruptedException {
    log.info("callback test3");
    SiddhiManager siddhiManager = new SiddhiManager();

    String siddhiApp = "" +
            "@app:name('callbackTest1') " +
            "" +
            "define stream StockStream (symbol string, price float, volume long);" +
            "" +
            "@info(name = 'query1') " +
            "from StockStream[70 > price] " +
            "select symbol, price " +
            "insert into outputStream;";


    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

    siddhiAppRuntime.addCallback("outputStream2", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            EventPrinter.print(events);
        }
    });

    siddhiAppRuntime.shutdown();
}
 
开发者ID:wso2,项目名称:siddhi,代码行数:28,代码来源:CallbackTestCase.java


示例16: inMemoryTestCase7

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void inMemoryTestCase7() throws InterruptedException {
    log.info("Test inMemory 7");

    String streams = "" +
            "@app:name('TestSiddhiApp')" +
            "@source(type='testTrpInMemory', topic='Foo', prop1='hi', prop2='test', fail='true', " +
            "   @map(type='passThrough', @attributes(symbol='trp:symbol'," +
            "        volume='volume',price='trp:price'))) " +
            "define stream FooStream (symbol string, price string, volume long); " +
            "define stream BarStream (symbol string, price string, volume long); ";

    String query = "" +
            "from FooStream " +
            "select * " +
            "insert into BarStream; ";

    SiddhiManager siddhiManager = new SiddhiManager();
    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);

    siddhiAppRuntime.addCallback("BarStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            EventPrinter.print(events);
            wso2Count.incrementAndGet();
            for (Event event : events) {
                AssertJUnit.assertArrayEquals(event.getData(), new Object[]{"hi", "test", 100L});
            }
        }
    });
    siddhiAppRuntime.start();
    InMemoryBroker.publish("Foo", new Event(System.currentTimeMillis(), new Object[]{"WSO2", "in", 100L}));
    Thread.sleep(100);

    //assert event count
    AssertJUnit.assertEquals("Number of events", 0, wso2Count.get());
    siddhiAppRuntime.shutdown();
}
 
开发者ID:wso2,项目名称:siddhi,代码行数:39,代码来源:InMemoryTransportTestCase.java


示例17: testPartitionQuery

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testPartitionQuery() throws InterruptedException {
    log.info("Partition test");
    SiddhiManager siddhiManager = new SiddhiManager();

    String siddhiApp = "@app:name('PartitionTest') " +
            "define stream streamA (symbol string, price int);" +
            "partition with (symbol of streamA) " +
            "begin " +
            "@info(name = 'query1') " +
            "from streamA select symbol,price insert into StockQuote ;  " +
            "end ";

    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

    StreamCallback streamCallback = new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            EventPrinter.print(events);
            AssertJUnit.assertTrue("IBM".equals(events[0].getData(0)) || "WSO2".equals(events[0].getData(0)));
            count.addAndGet(events.length);
            eventArrived = true;
        }
    };
    siddhiAppRuntime.addCallback("StockQuote", streamCallback);

    InputHandler inputHandler = siddhiAppRuntime.getInputHandler("streamA");
    siddhiAppRuntime.start();
    inputHandler.send(new Object[]{"IBM", 700});
    inputHandler.send(new Object[]{"WSO2", 60});
    inputHandler.send(new Object[]{"WSO2", 60});
    SiddhiTestHelper.waitForEvents(100, 3, count, 60000);
    AssertJUnit.assertTrue(eventArrived);
    AssertJUnit.assertEquals(3, count.get());
    siddhiAppRuntime.shutdown();
}
 
开发者ID:wso2,项目名称:siddhi,代码行数:37,代码来源:PartitionTestCase.java


示例18: testPartitionQuery2

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testPartitionQuery2() throws InterruptedException {
    log.info("Partition test2");
    SiddhiManager siddhiManager = new SiddhiManager();

    String siddhiApp = "@app:name('PartitionTest2') " +
            "define stream cseEventStream (symbol string, price float,volume int);"
            + "define stream StockStream1 (symbol string, price float,volume int);"
            + "partition with (symbol of cseEventStream , symbol of StockStream1) begin @info(name = 'query1') " +
            "from cseEventStream[700>price] select symbol,sum(price) as price,volume insert into OutStockStream ;" +
            "  end ";


    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);


    siddhiAppRuntime.addCallback("OutStockStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            EventPrinter.print(events);
            count.addAndGet(events.length);
            eventArrived = true;
        }
    });
    InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
    siddhiAppRuntime.start();
    inputHandler.send(new Object[]{"IBM", 75.6f, 100});
    inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
    inputHandler.send(new Object[]{"IBM", 75.6f, 100});
    inputHandler.send(new Object[]{"ORACLE", 75.6f, 100});
    SiddhiTestHelper.waitForEvents(100, 4, count, 60000);
    AssertJUnit.assertEquals(4, count.get());
    siddhiAppRuntime.shutdown();
}
 
开发者ID:wso2,项目名称:siddhi,代码行数:35,代码来源:PartitionTestCase.java


示例19: streamCallbackTest

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void streamCallbackTest() throws InterruptedException {

    log.info("stream callback test");

    ExecutionPlan executionPlan = new ExecutionPlan("plan1");
    executionPlan.defineStream(StreamDefinition.id("cseEventStream").attribute("symbol", Attribute.Type.STRING).attribute("price", Attribute.Type.FLOAT).attribute("volume", Attribute.Type.INT));

    SiddhiManager siddhiManager = new SiddhiManager();
    ExecutionPlanRuntime executionPlanRuntime = siddhiManager.addExecutionPlan(executionPlan);

    executionPlanRuntime.addCallback("cseEventStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            count++;
            eventArrived = true;
        }
    });


    InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream") ;
    inputHandler.send(new Object[]{"IBM", 75.6f, 100});
    inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
    Thread.sleep(100);
    Assert.assertTrue(eventArrived);
    Assert.assertEquals(2, count);


}
 
开发者ID:sacjaya,项目名称:siddhi-3,代码行数:30,代码来源:PassThroughTestCase.java


示例20: init

import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public void init() {
    SiddhiConfiguration configuration = new SiddhiConfiguration();
    SiddhiManager siddhiManager = new SiddhiManager(configuration);

    if(this.executionPlan != null) {
        siddhiManager.addExecutionPlan(executionPlan);
    }
    this.siddhiManager = siddhiManager;
    for (final String streamId : exportedStreams) {
        siddhiManager.addCallback(streamId, new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    List<Object> asList = Arrays.asList(event.getData());
                    log.debug(componentID + ">Siddhi: Emit Event " + event);
                    if (useDefaultAsStreamName) {
                        getCollector().emit(asList);
                    } else {
                        getCollector().emit(streamId, asList);
                    }

                }
            }
        });
    }

}
 
开发者ID:lasanthafdo,项目名称:siddhi-storm,代码行数:28,代码来源:SiddhiTransactionalBolt.java



注:本文中的org.wso2.siddhi.core.stream.output.StreamCallback类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ServiceContextRegistry类代码示例发布时间:2022-05-22
下一篇:
Java SPacketSetSlot类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap