本文整理汇总了Java中org.wso2.siddhi.core.stream.output.StreamCallback类的典型用法代码示例。如果您正苦于以下问题:Java StreamCallback类的具体用法?Java StreamCallback怎么用?Java StreamCallback使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
StreamCallback类属于org.wso2.siddhi.core.stream.output包,在下文中一共展示了StreamCallback类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, SiddhiParserException {
// Create Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.defineStream("define stream cseEventStream ( symbol string, price float, volume int )");
siddhiManager.addQuery("from cseEventStream [ price >= 50 ] " +
"select symbol, price "+
"insert into StockQuote ;");
siddhiManager.addCallback("StockQuote", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
}
});
InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"GOOG", 50f, 100});
inputHandler.send(new Object[]{"IBM", 76.6f, 100});
inputHandler.send(new Object[]{"WSO2", 45.6f, 100});
Thread.sleep(500);
siddhiManager.shutdown();
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:26,代码来源:SimpleFilterSample.java
示例2: getCallback
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
/**
* Gets the callback that Siddhi will use to output messages
*
* @param streamName The stream that produced the message
* @param topic The destination topic for that message
* @param attributes Attributes from the message, which stores both attributes names and values.
* @return A StreamCallback that Siddhi will use to send events
*/
public StreamCallback getCallback(final String streamName, final String topic, final List<Attribute> attributes) {
return new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
// This map will store the final message
Map<String, Object> result = new HashMap<>();
// Get all the attributes values from the list of attributes
int index = 0;
for (Object object : event.getData()) {
String columnName = attributes.get(index++).getName();
result.put(columnName, object);
}
// Send the message to every sink
sinksManager.process(streamName, topic, result);
}
}
};
}
开发者ID:redBorder,项目名称:cep,代码行数:31,代码来源:SiddhiCallback.java
示例3: start
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
/**
* Starts the siddhi execution plan
*
* @param siddhiManager The manager that will manage the execution plan
* @param siddhiCallback The callback to be called when the execution plan creates a new message
* @throws ExecutionPlanException
*/
public void start(SiddhiManager siddhiManager, SiddhiCallback siddhiCallback) throws ExecutionPlanException {
this.executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(getFullExecutionPlan());
for (Map.Entry<String, String> entry : outputTopics.entrySet()) {
String streamName = entry.getKey();
String topic = entry.getValue();
AbstractDefinition abstractDefinition = executionPlanRuntime.getStreamDefinitionMap().get(streamName);
if (abstractDefinition != null) {
List<Attribute> attributes = abstractDefinition.getAttributeList();
StreamCallback streamCallback = siddhiCallback.getCallback(streamName, topic, attributes);
executionPlanRuntime.addCallback(streamName, streamCallback);
} else {
throw new InvalidExecutionPlanException("You specified a output that is not present on the execution plan");
}
}
executionPlanRuntime.start();
log.info("Started execution plan with id {} version {}", id, version, fullExecutionPlan);
}
开发者ID:redBorder,项目名称:cep,代码行数:29,代码来源:SiddhiPlan.java
示例4: createTestExecutionRuntime
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
private SiddhiAppRuntime createTestExecutionRuntime() {
siddhiManager = new SiddhiManager();
String siddhiApp = "" +
"@app:name('callbackTest1') " +
"" +
"@async(buffer.size='2')" +
"define stream StockStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"@Parallel " +
"from StockStream[price + 0.0 > 0.0] " +
"select symbol, price " +
"insert into outputStream;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
count.addAndGet(events.length);
eventArrived = true;
}
});
return siddhiAppRuntime;
}
开发者ID:wso2,项目名称:siddhi,代码行数:26,代码来源:ExceptionHandlerTestCase.java
示例5: distinctFilterTest
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void distinctFilterTest() throws InterruptedException {
sm.defineStream("define stream testStream (c1 string, c2 float, c3 int);");
sm.addQuery("from testStream #window.stratio:distinct(c1) select c1, c2,c3 insert into resultStream;");
sm.addCallback("resultStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
if (event instanceof InEvent) {
count.getAndIncrement();
}
}
}
});
sm.getInputHandler("testStream").send(new Object[] { new String("KEY_A"), new Float(10), new Integer(20) });
sm.getInputHandler("testStream").send(new Object[] { new String("KEY_A"), new Float(20), new Integer(30) });
sm.getInputHandler("testStream").send(new Object[] { new String("KEY_A"), new Float(30), new Integer(40) });
sm.getInputHandler("testStream").send(new Object[] { new String("KEY_B"), new Float(30), new Integer(40) });
Thread.sleep(500);
assertEquals(2, count.get());
}
开发者ID:Stratio,项目名称:Decision,代码行数:25,代码来源:DistinctWindowTest.java
示例6: main
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String cseEventStream = "define stream cseEventStream (symbol string, price float, volume long);";
String query = "from cseEventStream[70 > price] select symbol,price,volume insert into outputStream ;";
siddhiManager.defineStream(cseEventStream);
siddhiManager.addQuery(query);
siddhiManager.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
count++;
if (count % 10000000 == 0) {
long end = System.currentTimeMillis();
double tp = (10000000 * 1000.0 / (end - start));
System.out.println("Throughput = " + tp + " Event/sec");
start = end;
}
}
});
InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
while (true) {
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
}
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:35,代码来源:SimpleFilterSingleQueryPerformance.java
示例7: main
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String cseEventStream = " define stream cseEventStream (symbol string, price float, volume int);";
String query1 = "from cseEventStream[70 > price] select symbol,price,volume insert into outputStream ;";
String query2 = "from cseEventStream[volume > 90] select symbol,price,volume insert into outputStream ;";
siddhiManager.defineStream(cseEventStream);
siddhiManager.addQuery(query1);
siddhiManager.addQuery(query2);
siddhiManager.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
count++;
if (count % 1000000 == 0) {
long end = System.currentTimeMillis();
double tp = (1000000 * 1000.0 / (end - start));
System.out.println("Throughput = " + tp + " Event/sec");
start = end;
}
}
});
InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
while (true) {
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 100f, 80});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 100f, 80});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
}
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:36,代码来源:SimpleFilterMultipleQueryPerformance.java
示例8: testQuery1
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testQuery1() throws InterruptedException {
log.info("StreamCallback test1");
SiddhiManager siddhiManager = new SiddhiManager();
InputHandler inputHandler = siddhiManager.defineStream(QueryFactory.createStreamDefinition().name("cseEventStream").attribute("symbol", Attribute.Type.STRING).attribute("price", Attribute.Type.FLOAT).attribute("volume", Attribute.Type.INT));
Query query = QueryFactory.createQuery();
query.from(QueryFactory.inputStream("cseEventStream"));
query.select(
QueryFactory.outputSelector().
select("symbol", Expression.variable("symbol")).
select("price", Expression.variable("price")).
select("volume", Expression.variable("volume"))
);
query.insertInto("StockQuote");
String queryReference = siddhiManager.addQuery(query);
siddhiManager.addCallback("StockQuote", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
Assert.assertTrue("IBM".equals(events[0].getData(0)) || "WSO2".equals(events[0].getData(0)));
count++;
}
});
// InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
Thread.sleep(500);
siddhiManager.shutdown();
Assert.assertEquals(2, count);
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:37,代码来源:CallbackTestCase.java
示例9: testTimeBatchAndSequence
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testTimeBatchAndSequence() throws Exception {
log.info("testTimeBatchAndSequence OUT 1");
SiddhiManager siddhi = new SiddhiManager();
InputHandler i1 = siddhi.defineStream("define stream received_reclamations (timestamp long, product_id string, defect_category string)");
siddhi.addQuery("from received_reclamations#window.timeBatch(1 sec) " +
"select product_id, defect_category, count(*) as num group by product_id, defect_category " +
"insert into reclamation_averages");
final CountDownLatch increaseEventReceived = new CountDownLatch(1);
siddhi.addQuery("from a=reclamation_averages[num > 1], b=reclamation_averages[num > a.num and product_id == a.product_id and defect_category == a.defect_category] " +
"select a.product_id, a.defect_category, a.num as oldNum, b.num as newNum " +
"insert into increased_reclamations");
siddhi.addCallback("increased_reclamations", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
increaseEventReceived.countDown();
}
});
for (int i = 0; i < 5; i++) {
i1.send(new Object[]{System.currentTimeMillis(), "abc", "123"});
Thread.sleep(1);
}
Thread.sleep(1000);
for (int i = 0; i < 8; i++) {
i1.send(new Object[] { System.currentTimeMillis(), "abc", "123" });
Thread.sleep(1);
}
assertTrue("Did not receive event in stream increased_reclamations", increaseEventReceived.await(1000, TimeUnit.MILLISECONDS));
siddhi.shutdown();
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:39,代码来源:SequenceTestCase.java
示例10: testSnapshotOutputRateLimitQuery6
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testSnapshotOutputRateLimitQuery6() throws InterruptedException {
log.info("SnapshotOutputRateLimit test6");
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.defineStream("define stream LoginEvents (timeStamp long, ip string) ");
String queryReference = siddhiManager.addQuery("from LoginEvents#window.time(1 sec) " +
"select ip " +
"output snapshot every 1 sec " +
"insert into uniqueIps for all-events ;");
siddhiManager.addCallback("uniqueIps", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
count++;
eventArrived = true;
}
});
InputHandler loginSucceedEvents = siddhiManager.getInputHandler("LoginEvents");
Thread.sleep(100);
loginSucceedEvents.send(new Object[]{System.currentTimeMillis(), "192.10.1.5"});
loginSucceedEvents.send(new Object[]{System.currentTimeMillis(), "192.10.1.3"});
Thread.sleep(2200);
Assert.assertEquals("Event arrived", true, eventArrived);
Assert.assertEquals("Number of output event value", 1, count);
siddhiManager.shutdown();
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:34,代码来源:SnapshotOutputRateLimitTestCase.java
示例11: testQuery1
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testQuery1() throws InterruptedException, SiddhiParserException {
log.info("Remove Query test1");
SiddhiManager siddhiManager = new SiddhiManager();
InputHandler inputHandler = siddhiManager.defineStream("define stream cseStream ( symbol string, price float, volume int )");
siddhiManager.addCallback("outStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
Assert.assertTrue("IBM".equals(events[0].getData(0)) || "WSO2".equals(events[0].getData(0)));
count++;
}
});
String queryReference = siddhiManager.addQuery("from cseStream[price>10] " +
"select symbol, price, volume " +
" having price*12 >100 " +
"insert into outStream ;");
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
siddhiManager.removeQuery(queryReference);
// InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
siddhiManager.shutdown();
Assert.assertEquals(2, count);
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:36,代码来源:AddRemoveTestCase.java
示例12: testSimplePlan
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testSimplePlan() throws InterruptedException {
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
"define stream inStream (name string, value double);"
+ "from inStream insert into outStream");
runtime.start();
final List<Object[]> received = new ArrayList<>(3);
InputHandler inputHandler = runtime.getInputHandler("inStream");
Assert.assertNotNull(inputHandler);
try {
runtime.getInputHandler("unknownStream");
Assert.fail("Should throw exception for getting input handler for unknown streamId.");
} catch (Exception ex) {
// Expected exception for getting input handler for illegal streamId.
}
runtime.addCallback("outStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
received.add(event.getData());
}
}
});
inputHandler.send(new Object[]{"a", 1.1});
inputHandler.send(new Object[]{"b", 1.2});
inputHandler.send(new Object[]{"c", 1.3});
Thread.sleep(100);
Assert.assertEquals(3, received.size());
Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1});
Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2});
Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3});
}
开发者ID:haoch,项目名称:flink-siddhi,代码行数:37,代码来源:SiddhiSyntaxTest.java
示例13: testSimplePlan
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testSimplePlan() throws InterruptedException {
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
"define stream inStream (name string, value double);"
+ "from inStream insert into outStream");
runtime.start();
final List<Object[]> received = new ArrayList<>(3);
InputHandler inputHandler = runtime.getInputHandler("inStream");
Assert.assertNotNull(inputHandler);
try {
runtime.getInputHandler("unknownStream");
Assert.fail("Should throw exception for getting input handler for unknown streamId.");
} catch (Exception ex) {
// Expected exception for getting input handler for illegal streamId.
}
runtime.addCallback("outStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
received.add(event.getData());
}
}
});
inputHandler.send(new Object[]{"a", 1.1});
inputHandler.send(new Object[]{"b", 1.2});
inputHandler.send(new Object[]{"c", 1.3});
Thread.sleep(100);
Assert.assertEquals(3, received.size());
Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1});
Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2});
Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3});
}
开发者ID:apache,项目名称:bahir-flink,代码行数:37,代码来源:SiddhiSyntaxTest.java
示例14: testFraudMoreThanOneOrderIn10MinutesWithDifferentCards
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testFraudMoreThanOneOrderIn10MinutesWithDifferentCards() throws Exception {
count.set(0);
LOGGER.debug("[Fraud] Checking if there are more than one order in 10 minutes with different credit cards");
LOGGER.debug("--> Creating Order Queries and Loading dataset");
//createOrdersQueries();
ORDER_QUERY3_ID= sm.addQuery(OrdersQueries.QUERY_MORE_1_ORDER_IN_10M);
sm.addCallback(OrdersQueries.STREAM_FRAUD, new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
if (event instanceof InEvent && event.getData(6).toString().equals("orders-3")) {
count.getAndIncrement();
LOGGER.debug("Found event: " + event.toString());
}
}
}
});
loadOrders(DATASET_ORDERS2);
Thread.sleep(500);
assertEquals(2, count.get());
sm.removeQuery(ORDER_QUERY3_ID);
}
开发者ID:Stratio,项目名称:Decision,代码行数:28,代码来源:OrdersQueriesValidationTest.java
示例15: callbackTest3
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test(expectedExceptions = DefinitionNotExistException.class)
public void callbackTest3() throws InterruptedException {
log.info("callback test3");
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp = "" +
"@app:name('callbackTest1') " +
"" +
"define stream StockStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from StockStream[70 > price] " +
"select symbol, price " +
"insert into outputStream;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("outputStream2", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
}
});
siddhiAppRuntime.shutdown();
}
开发者ID:wso2,项目名称:siddhi,代码行数:28,代码来源:CallbackTestCase.java
示例16: inMemoryTestCase7
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void inMemoryTestCase7() throws InterruptedException {
log.info("Test inMemory 7");
String streams = "" +
"@app:name('TestSiddhiApp')" +
"@source(type='testTrpInMemory', topic='Foo', prop1='hi', prop2='test', fail='true', " +
" @map(type='passThrough', @attributes(symbol='trp:symbol'," +
" volume='volume',price='trp:price'))) " +
"define stream FooStream (symbol string, price string, volume long); " +
"define stream BarStream (symbol string, price string, volume long); ";
String query = "" +
"from FooStream " +
"select * " +
"insert into BarStream; ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
siddhiAppRuntime.addCallback("BarStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
wso2Count.incrementAndGet();
for (Event event : events) {
AssertJUnit.assertArrayEquals(event.getData(), new Object[]{"hi", "test", 100L});
}
}
});
siddhiAppRuntime.start();
InMemoryBroker.publish("Foo", new Event(System.currentTimeMillis(), new Object[]{"WSO2", "in", 100L}));
Thread.sleep(100);
//assert event count
AssertJUnit.assertEquals("Number of events", 0, wso2Count.get());
siddhiAppRuntime.shutdown();
}
开发者ID:wso2,项目名称:siddhi,代码行数:39,代码来源:InMemoryTransportTestCase.java
示例17: testPartitionQuery
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testPartitionQuery() throws InterruptedException {
log.info("Partition test");
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp = "@app:name('PartitionTest') " +
"define stream streamA (symbol string, price int);" +
"partition with (symbol of streamA) " +
"begin " +
"@info(name = 'query1') " +
"from streamA select symbol,price insert into StockQuote ; " +
"end ";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
StreamCallback streamCallback = new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
AssertJUnit.assertTrue("IBM".equals(events[0].getData(0)) || "WSO2".equals(events[0].getData(0)));
count.addAndGet(events.length);
eventArrived = true;
}
};
siddhiAppRuntime.addCallback("StockQuote", streamCallback);
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("streamA");
siddhiAppRuntime.start();
inputHandler.send(new Object[]{"IBM", 700});
inputHandler.send(new Object[]{"WSO2", 60});
inputHandler.send(new Object[]{"WSO2", 60});
SiddhiTestHelper.waitForEvents(100, 3, count, 60000);
AssertJUnit.assertTrue(eventArrived);
AssertJUnit.assertEquals(3, count.get());
siddhiAppRuntime.shutdown();
}
开发者ID:wso2,项目名称:siddhi,代码行数:37,代码来源:PartitionTestCase.java
示例18: testPartitionQuery2
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void testPartitionQuery2() throws InterruptedException {
log.info("Partition test2");
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp = "@app:name('PartitionTest2') " +
"define stream cseEventStream (symbol string, price float,volume int);"
+ "define stream StockStream1 (symbol string, price float,volume int);"
+ "partition with (symbol of cseEventStream , symbol of StockStream1) begin @info(name = 'query1') " +
"from cseEventStream[700>price] select symbol,sum(price) as price,volume insert into OutStockStream ;" +
" end ";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("OutStockStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
count.addAndGet(events.length);
eventArrived = true;
}
});
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"ORACLE", 75.6f, 100});
SiddhiTestHelper.waitForEvents(100, 4, count, 60000);
AssertJUnit.assertEquals(4, count.get());
siddhiAppRuntime.shutdown();
}
开发者ID:wso2,项目名称:siddhi,代码行数:35,代码来源:PartitionTestCase.java
示例19: streamCallbackTest
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
@Test
public void streamCallbackTest() throws InterruptedException {
log.info("stream callback test");
ExecutionPlan executionPlan = new ExecutionPlan("plan1");
executionPlan.defineStream(StreamDefinition.id("cseEventStream").attribute("symbol", Attribute.Type.STRING).attribute("price", Attribute.Type.FLOAT).attribute("volume", Attribute.Type.INT));
SiddhiManager siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.addExecutionPlan(executionPlan);
executionPlanRuntime.addCallback("cseEventStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
count++;
eventArrived = true;
}
});
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream") ;
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
inputHandler.send(new Object[]{"WSO2", 75.6f, 100});
Thread.sleep(100);
Assert.assertTrue(eventArrived);
Assert.assertEquals(2, count);
}
开发者ID:sacjaya,项目名称:siddhi-3,代码行数:30,代码来源:PassThroughTestCase.java
示例20: init
import org.wso2.siddhi.core.stream.output.StreamCallback; //导入依赖的package包/类
public void init() {
SiddhiConfiguration configuration = new SiddhiConfiguration();
SiddhiManager siddhiManager = new SiddhiManager(configuration);
if(this.executionPlan != null) {
siddhiManager.addExecutionPlan(executionPlan);
}
this.siddhiManager = siddhiManager;
for (final String streamId : exportedStreams) {
siddhiManager.addCallback(streamId, new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
List<Object> asList = Arrays.asList(event.getData());
log.debug(componentID + ">Siddhi: Emit Event " + event);
if (useDefaultAsStreamName) {
getCollector().emit(asList);
} else {
getCollector().emit(streamId, asList);
}
}
}
});
}
}
开发者ID:lasanthafdo,项目名称:siddhi-storm,代码行数:28,代码来源:SiddhiTransactionalBolt.java
注:本文中的org.wso2.siddhi.core.stream.output.StreamCallback类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论