• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Status类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flume.Sink.Status的典型用法代码示例。如果您正苦于以下问题:Java Status类的具体用法?Java Status怎么用?Java Status使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Status类属于org.apache.flume.Sink包,在下文中一共展示了Status类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: process

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Override
public Status process() throws EventDeliveryException {
  Status status = null;

  Iterator<Sink> sinkIterator = selector.createSinkIterator();
  while (sinkIterator.hasNext()) {
    Sink sink = sinkIterator.next();
    try {
      status = sink.process();
      break;
    } catch (Exception ex) {
      selector.informSinkFailed(sink);
      LOGGER.warn("Sink failed to consume event. "
          + "Attempting next sink if available.", ex);
    }
  }

  if (status == null) {
    throw new EventDeliveryException("All configured sinks have failed");
  }

  return status;
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:24,代码来源:LoadBalancingSinkProcessor.java


示例2: ensureGroupConfigurationCorrectlyUsed

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureGroupConfigurationCorrectlyUsed() throws Exception {
    when(channel.take()).thenReturn(event);
    when(event.getBody()).thenReturn("something".getBytes());

    Context context = new Context();
    context.put("defaultRollback", "true");
    context.put("defaultBackoff", "true");
    context.put("defaultIncrementMetrics", "false");
    context.put("rollback.2XX", "false");
    context.put("backoff.2XX", "false");
    context.put("incrementMetrics.2XX", "true");

    executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_OK);
    executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_NO_CONTENT);
}
 
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:17,代码来源:HttpSinkTest.java


示例3: ensureSingleStatusConfigurationOverridesGroupConfigurationCorrectly

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureSingleStatusConfigurationOverridesGroupConfigurationCorrectly() throws Exception {
    when(channel.take()).thenReturn(event);
    when(event.getBody()).thenReturn("something".getBytes());

    Context context = new Context();
    context.put("rollback.2XX", "false");
    context.put("backoff.2XX", "false");
    context.put("incrementMetrics.2XX", "true");
    context.put("rollback.200", "true");
    context.put("backoff.200", "true");
    context.put("incrementMetrics.200", "false");

    executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_NO_CONTENT);
    executeWithMocks(false, Status.BACKOFF, false, true, context, HttpURLConnection.HTTP_OK);
}
 
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:17,代码来源:HttpSinkTest.java


示例4: testFieldType

import org.apache.flume.Sink.Status; //导入依赖的package包/类
private void testFieldType(final String field, final String value, final Status result) {
  //System.out.println();
  headers.put(field, value);
  addEventToChannel(headers);
  boolean thrown = false;
  try {
    Status status = sink.process();
    Assert.assertEquals(result, status);
  } catch (EventDeliveryException ex) {
    thrown = true;
  }
  final Transaction tx = channel.getTransaction();
  tx.begin();
  final Event nextEvent = channel.take();
  tx.commit();
  tx.close();
  if (result == Status.READY) {
    Assert.assertFalse(thrown);
    Assert.assertNull(nextEvent);
  } else {
    Assert.assertTrue(thrown);
    Assert.assertNotNull(nextEvent);
  }
}
 
开发者ID:Stratio,项目名称:ingestion,代码行数:25,代码来源:CassandraDataTypesIT.java


示例5: testDefaultConfiguration

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testDefaultConfiguration() throws Exception {
  // If no selector is specified, the round-robin selector should be used
  Channel ch = new MockChannel();
  int n = 100;
  int numEvents = 3 * n;
  for (int i = 0; i < numEvents; i++) {
    ch.put(new MockEvent("test" + i));
  }

  MockSink s1 = new MockSink(1);
  s1.setChannel(ch);

  MockSink s2 = new MockSink(2);
  s2.setChannel(ch);

  MockSink s3 = new MockSink(3);
  s3.setChannel(ch);

  List<Sink> sinks = new ArrayList<Sink>();
  sinks.add(s1);
  sinks.add(s2);
  sinks.add(s3);

  LoadBalancingSinkProcessor lbsp = getProcessor(sinks, new Context());

  Status s = Status.READY;
  while (s != Status.BACKOFF) {
    s = lbsp.process();
  }

  Assert.assertTrue(s1.getEvents().size() == n);
  Assert.assertTrue(s2.getEvents().size() == n);
  Assert.assertTrue(s3.getEvents().size() == n);

}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:37,代码来源:TestLoadBalancingSinkProcessor.java


示例6: testRandomPersistentFailure

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testRandomPersistentFailure() throws Exception {
  Channel ch = new MockChannel();
  int n = 100;
  int numEvents = 3 * n;
  for (int i = 0; i < numEvents; i++) {
    ch.put(new MockEvent("test" + i));
  }

  MockSink s1 = new MockSink(1);
  s1.setChannel(ch);

  MockSink s2 = new MockSink(2);
  s2.setChannel(ch);

  // s2 always fails
  s2.setFail(true);

  MockSink s3 = new MockSink(3);
  s3.setChannel(ch);

  List<Sink> sinks = new ArrayList<Sink>();
  sinks.add(s1);
  sinks.add(s2);
  sinks.add(s3);

  LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks, false);

  Status s = Status.READY;
  while (s != Status.BACKOFF) {
    s = lbsp.process();
  }

  Assert.assertTrue(s2.getEvents().size() == 0);
  Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3 * n);
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:37,代码来源:TestLoadBalancingSinkProcessor.java


示例7: testRoundRobinPersistentFailure

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testRoundRobinPersistentFailure() throws Exception {
  Channel ch = new MockChannel();
  int n = 100;
  int numEvents = 3 * n;
  for (int i = 0; i < numEvents; i++) {
    ch.put(new MockEvent("test" + i));
  }

  MockSink s1 = new MockSink(1);
  s1.setChannel(ch);

  MockSink s2 = new MockSink(2);
  s2.setChannel(ch);

  // s2 always fails
  s2.setFail(true);

  MockSink s3 = new MockSink(3);
  s3.setChannel(ch);

  List<Sink> sinks = new ArrayList<Sink>();
  sinks.add(s1);
  sinks.add(s2);
  sinks.add(s3);

  LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false);

  Status s = Status.READY;
  while (s != Status.BACKOFF) {
    s = lbsp.process();
  }

  Assert.assertTrue(s1.getEvents().size() == n);
  Assert.assertTrue(s2.getEvents().size() == 0);
  Assert.assertTrue(s3.getEvents().size() == 2 * n);
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:38,代码来源:TestLoadBalancingSinkProcessor.java


示例8: testRoundRobinNoFailure

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testRoundRobinNoFailure() throws Exception {

  Channel ch = new MockChannel();
  int n = 100;
  int numEvents = 3 * n;
  for (int i = 0; i < numEvents; i++) {
    ch.put(new MockEvent("test" + i));
  }

  MockSink s1 = new MockSink(1);
  s1.setChannel(ch);

  MockSink s2 = new MockSink(2);
  s2.setChannel(ch);

  MockSink s3 = new MockSink(3);
  s3.setChannel(ch);

  List<Sink> sinks = new ArrayList<Sink>();
  sinks.add(s1);
  sinks.add(s2);
  sinks.add(s3);

  LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false);

  Status s = Status.READY;
  while (s != Status.BACKOFF) {
    s = lbsp.process();
  }

  Assert.assertTrue(s1.getEvents().size() == n);
  Assert.assertTrue(s2.getEvents().size() == n);
  Assert.assertTrue(s3.getEvents().size() == n);
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:36,代码来源:TestLoadBalancingSinkProcessor.java


示例9: testCustomSelector

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testCustomSelector() throws Exception {
  Channel ch = new MockChannel();
  int n = 10;
  int numEvents = n;
  for (int i = 0; i < numEvents; i++) {
    ch.put(new MockEvent("test" + i));
  }

  MockSink s1 = new MockSink(1);
  s1.setChannel(ch);

  // s1 always fails
  s1.setFail(true);

  MockSink s2 = new MockSink(2);
  s2.setChannel(ch);

  MockSink s3 = new MockSink(3);
  s3.setChannel(ch);

  List<Sink> sinks = new ArrayList<Sink>();
  sinks.add(s1);
  sinks.add(s2);
  sinks.add(s3);

  // This selector will result in all events going to s2
  Context ctx = getContext(FixedOrderSelector.class.getCanonicalName());
  ctx.put("selector." + FixedOrderSelector.SET_ME, "foo");
  LoadBalancingSinkProcessor lbsp = getProcessor(sinks, ctx);

  Sink.Status s = Sink.Status.READY;
  while (s != Sink.Status.BACKOFF) {
    s = lbsp.process();
  }

  Assert.assertTrue(s1.getEvents().size() == 0);
  Assert.assertTrue(s2.getEvents().size() == n);
  Assert.assertTrue(s3.getEvents().size() == 0);
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:41,代码来源:TestLoadBalancingSinkProcessor.java


示例10: process

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Override
public Status process() throws EventDeliveryException {
  if (fail) {
    throw new EventDeliveryException("failed");
  }
  Event e = this.getChannel().take();
  if (e == null) {
    return Status.BACKOFF;
  }

  events.add(e);
  return Status.READY;
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:14,代码来源:TestLoadBalancingSinkProcessor.java


示例11: testEmptyChannelResultsInStatusBackoff

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testEmptyChannelResultsInStatusBackoff()
    throws InterruptedException, LifecycleException, EventDeliveryException {
  LOG.debug("Starting...");
  Context context = new Context();
  Channel channel = new MemoryChannel();
  context.put("hdfs.path", testPath);
  context.put("keep-alive", "0");
  Configurables.configure(sink, context);
  Configurables.configure(channel, context);
  sink.setChannel(channel);
  sink.start();
  Assert.assertEquals(Status.BACKOFF, sink.process());
  sink.stop();
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:16,代码来源:TestHDFSEventSink.java


示例12: testSlowAppendFailure

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testSlowAppendFailure() throws InterruptedException,
    LifecycleException, EventDeliveryException, IOException {

  LOG.debug("Starting...");
  final String fileName = "FlumeData";
  final long rollCount = 5;
  final long batchSize = 2;
  final int numBatches = 2;
  String newPath = testPath + "/singleBucket";
  int i = 1, j = 1;

  // clear the test directory
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  Path dirPath = new Path(newPath);
  fs.delete(dirPath, true);
  fs.mkdirs(dirPath);

  // create HDFS sink with slow writer
  HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
  sink = new HDFSEventSink(badWriterFactory);

  Context context = new Context();
  context.put("hdfs.path", newPath);
  context.put("hdfs.filePrefix", fileName);
  context.put("hdfs.rollCount", String.valueOf(rollCount));
  context.put("hdfs.batchSize", String.valueOf(batchSize));
  context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
  context.put("hdfs.callTimeout", Long.toString(1000));
  Configurables.configure(sink, context);

  Channel channel = new MemoryChannel();
  Configurables.configure(channel, context);

  sink.setChannel(channel);
  sink.start();

  Calendar eventDate = Calendar.getInstance();

  // push the event batches into channel
  for (i = 0; i < numBatches; i++) {
    Transaction txn = channel.getTransaction();
    txn.begin();
    for (j = 1; j <= batchSize; j++) {
      Event event = new SimpleEvent();
      eventDate.clear();
      eventDate.set(2011, i, i, i, 0); // yy mm dd
      event.getHeaders().put("timestamp",
          String.valueOf(eventDate.getTimeInMillis()));
      event.getHeaders().put("hostname", "Host" + i);
      event.getHeaders().put("slow", "1500");
      event.setBody(("Test." + i + "." + j).getBytes());
      channel.put(event);
    }
    txn.commit();
    txn.close();

    // execute sink to process the events
    Status satus = sink.process();

    // verify that the append returned backoff due to timeotu
    Assert.assertEquals(satus, Status.BACKOFF);
  }

  sink.stop();
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:68,代码来源:TestHDFSEventSink.java


示例13: shouldIndexFiveEventsOverThreeBatches

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void shouldIndexFiveEventsOverThreeBatches() throws Exception {
  parameters.put(BATCH_SIZE, "2");
  Configurables.configure(fixture, new Context(parameters));
  Channel channel = bindAndStartChannel(fixture);

  int numberOfEvents = 5;
  Event[] events = new Event[numberOfEvents];

  Transaction tx = channel.getTransaction();
  tx.begin();
  for (int i = 0; i < numberOfEvents; i++) {
    String body = "event #" + i + " of " + numberOfEvents;
    Event event = EventBuilder.withBody(body.getBytes());
    events[i] = event;
    channel.put(event);
  }
  tx.commit();
  tx.close();

  int count = 0;
  Status status = Status.READY;
  while (status != Status.BACKOFF) {
    count++;
    status = fixture.process();
  }
  fixture.stop();

  assertEquals(3, count);

  client.admin().indices()
      .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
  assertMatchAllQuery(numberOfEvents, events);
  assertBodyQuery(5, events);
}
 
开发者ID:Redliver,项目名称:flume-ng-elasticsearch5-sink,代码行数:36,代码来源:TestElasticSearchSink.java


示例14: ensureRollbackBackoffAndIncrementMetricsIfConfigured

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureRollbackBackoffAndIncrementMetricsIfConfigured() throws Exception {
    when(channel.take()).thenReturn(event);
    when(event.getBody()).thenReturn("something".getBytes());

    Context context = new Context();
    context.put("defaultRollback", "true");
    context.put("defaultBackoff", "true");
    context.put("defaultIncrementMetrics", "true");

    executeWithMocks(false, Status.BACKOFF, true, true, context, HttpURLConnection.HTTP_OK);
}
 
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:13,代码来源:HttpSinkTest.java


示例15: ensureCommitReadyAndNoIncrementMetricsIfConfigured

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureCommitReadyAndNoIncrementMetricsIfConfigured() throws Exception {
    when(channel.take()).thenReturn(event);
    when(event.getBody()).thenReturn("something".getBytes());

    Context context = new Context();
    context.put("defaultRollback", "false");
    context.put("defaultBackoff", "false");
    context.put("defaultIncrementMetrics", "false");

    executeWithMocks(true, Status.READY, false, false, context, HttpURLConnection.HTTP_OK);
}
 
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:13,代码来源:HttpSinkTest.java


示例16: ensureSingleStatusConfigurationCorrectlyUsed

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureSingleStatusConfigurationCorrectlyUsed() throws Exception {
    when(channel.take()).thenReturn(event);
    when(event.getBody()).thenReturn("something".getBytes());

    Context context = new Context();
    context.put("defaultRollback", "true");
    context.put("defaultBackoff", "true");
    context.put("defaultIncrementMetrics", "false");
    context.put("rollback.200", "false");
    context.put("backoff.200", "false");
    context.put("incrementMetrics.200", "true");

    executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_OK);
}
 
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:16,代码来源:HttpSinkTest.java


示例17: process

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Override
public Status process() throws EventDeliveryException {
    if (invalidConfiguration) {
        return Status.BACKOFF;
    } else if (rollbackedAccumulations.isEmpty()) {
        return processNewBatches();
    } else {
        processRollbackedBatches();
        return processNewBatches();
    } // if else
}
 
开发者ID:telefonicaid,项目名称:fiware-cygnus,代码行数:12,代码来源:NGSISink.java


示例18: testProcessStatusReady

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testProcessStatusReady() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
	when(mockEvent.getBody()).thenReturn("frank".getBytes());
	Status status = mockKafkaSink.process();
	verify(mockChannel, times(1)).getTransaction();
	verify(mockChannel, times(1)).take();
	verify(mockProducer, times(1)).send((KeyedMessage<byte[], byte[]>) any());
	verify(mockTx, times(1)).commit();
	verify(mockTx, times(0)).rollback();
	verify(mockTx, times(1)).close();
	assertEquals(Status.READY, status);
}
 
开发者ID:keedio,项目名称:flume-ng-kafka-avro-sink,代码行数:14,代码来源:KafkaAvroSinkTest.java


示例19: testProcessStatusBackoff

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testProcessStatusBackoff() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
	when(mockEvent.getBody()).thenThrow(new RuntimeException());
	Status status = mockKafkaSink.process();
	verify(mockChannel, times(1)).getTransaction();
	verify(mockChannel, times(1)).take();
	verify(mockProducer, times(0)).send((KeyedMessage<byte[], byte[]>) any());
	verify(mockTx, times(0)).commit();
	verify(mockTx, times(1)).rollback();
	verify(mockTx, times(1)).close();
	assertEquals(Status.BACKOFF, status);
}
 
开发者ID:keedio,项目名称:flume-ng-kafka-avro-sink,代码行数:14,代码来源:KafkaAvroSinkTest.java


示例20: timestampFieldAllowsDatesWithTheFormatDefined

import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Ignore
@Test
public void timestampFieldAllowsDatesWithTheFormatDefined() {
  testFieldType(TIMESTAMP_FIELD, "1231234", Status.READY);
  testFieldType(TIMESTAMP_FIELD, "2010-12-20T10:20:20", Status.READY);
  testFieldType(TIMESTAMP_FIELD, "2010-12-20T10:20:20.000", Status.READY);
}
 
开发者ID:Stratio,项目名称:ingestion,代码行数:8,代码来源:CassandraDataTypesIT.java



注:本文中的org.apache.flume.Sink.Status类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Credential类代码示例发布时间:2022-05-23
下一篇:
Java BlockInfoUnderConstruction类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap