• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java SourceTaskContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.connect.source.SourceTaskContext的典型用法代码示例。如果您正苦于以下问题:Java SourceTaskContext类的具体用法?Java SourceTaskContext怎么用?Java SourceTaskContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



SourceTaskContext类属于org.apache.kafka.connect.source包,在下文中一共展示了SourceTaskContext类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: before

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@BeforeEach
public void before() {
  this.sourceTaskContext = mock(SourceTaskContext.class);
  this.offsetStorageReader = mock(OffsetStorageReader.class);
  when(this.sourceTaskContext.offsetStorageReader()).thenReturn(this.offsetStorageReader);
  this.task = new KinesisSourceTask();
  this.task.initialize(this.sourceTaskContext);
  this.kinesisClient = mock(AmazonKinesis.class);
  this.task.time = mock(Time.class);
  this.task.kinesisClientFactory = mock(KinesisClientFactory.class);
  when(this.task.kinesisClientFactory.create(any())).thenReturn(this.kinesisClient);

  this.settings = TestData.settings();
  this.config = new KinesisSourceConnectorConfig(this.settings);

}
 
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:17,代码来源:KinesisSourceTaskTest.java


示例2: setup

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Before
public void setup() throws IOException, SQLException {
    String mysqlHost = "10.100.172.86";
    connection = DriverManager.getConnection("jdbc:mysql://" + mysqlHost + ":3306/mysql", "root", "passwd");
    
    config = new HashMap<>();
    config.put(MySqlSourceConnector.USER_CONFIG, "maxwell");
    config.put(MySqlSourceConnector.PASSWORD_CONFIG, "XXXXXX");
    config.put(MySqlSourceConnector.PORT_CONFIG, "3306");
    config.put(MySqlSourceConnector.HOST_CONFIG, mysqlHost);
    
    task = new MySqlSourceTask();
    offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
    context = PowerMock.createMock(SourceTaskContext.class);
    task.initialize(context);

    runSql("drop table if exists test.users");
    runSql("drop database if exists test");
}
 
开发者ID:wushujames,项目名称:kafka-mysql-connector,代码行数:20,代码来源:MySqlSourceTaskTest.java


示例3: setup

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Before
public void setup() throws IOException {
    tempFile = File.createTempFile("file-stream-source-task-test", null);
    config = new HashMap<>();
    config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
    config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
    task = new FileStreamSourceTask();
    offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
    context = PowerMock.createMock(SourceTaskContext.class);
    task.initialize(context);
}
 
开发者ID:wngn123,项目名称:wngn-jms-kafka,代码行数:12,代码来源:FileStreamSourceTaskTest.java


示例4: testPollsInBackground

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testPollsInBackground() throws Exception {
    createWorkerTask();

    sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
    EasyMock.expectLastCall();
    sourceTask.start(TASK_PROPS);
    EasyMock.expectLastCall();
    statusListener.onStartup(taskId);
    EasyMock.expectLastCall();

    final CountDownLatch pollLatch = expectPolls(10);
    // In this test, we don't flush, so nothing goes any further than the offset writer

    sourceTask.stop();
    EasyMock.expectLastCall();
    expectOffsetFlush(true);

    statusListener.onShutdown(taskId);
    EasyMock.expectLastCall();

    producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
    EasyMock.expectLastCall();

    transformationChain.close();
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    Future<?> taskFuture = executor.submit(workerTask);

    assertTrue(awaitLatch(pollLatch));
    workerTask.stop();
    assertTrue(workerTask.awaitStop(1000));

    taskFuture.get();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:41,代码来源:WorkerSourceTaskTest.java


示例5: initTask

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Before
public void initTask() {
    task = new FsSourceTask();
    taskConfig = new HashMap<String, String>() {{
        String uris[] = directories.stream().map(dir -> dir.toString())
                .toArray(size -> new String[size]);
        put(FsSourceTaskConfig.FS_URIS, String.join(",", uris));
        put(FsSourceTaskConfig.TOPIC, "topic_test");
        put(FsSourceTaskConfig.POLICY_CLASS, SimplePolicy.class.getName());
        put(FsSourceTaskConfig.FILE_READER_CLASS, TextFileReader.class.getName());
        put(FsSourceTaskConfig.POLICY_REGEXP, "^[0-9]*\\.txt$");
    }};

    //Mock initialization
    taskContext = PowerMock.createMock(SourceTaskContext.class);
    offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);

    EasyMock.expect(taskContext.offsetStorageReader())
            .andReturn(offsetStorageReader);

    EasyMock.expect(taskContext.offsetStorageReader())
            .andReturn(offsetStorageReader);

    EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
            .andReturn(new HashMap<String, Object>() {{
                put("offset", 5L);
            }});
    EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
            .andReturn(new HashMap<String, Object>() {{
                put("offset", 5L);
            }});

    EasyMock.checkOrder(taskContext, false);
    EasyMock.replay(taskContext);

    EasyMock.checkOrder(offsetStorageReader, false);
    EasyMock.replay(offsetStorageReader);

    task.initialize(taskContext);

}
 
开发者ID:mmolimar,项目名称:kafka-connect-fs,代码行数:42,代码来源:FsSourceTaskTestBase.java


示例6: testPause

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testPause() throws Exception {
    createWorkerTask();

    sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
    EasyMock.expectLastCall();
    sourceTask.start(TASK_PROPS);
    EasyMock.expectLastCall();
    statusListener.onStartup(taskId);
    EasyMock.expectLastCall();

    AtomicInteger count = new AtomicInteger(0);
    CountDownLatch pollLatch = expectPolls(10, count);
    // In this test, we don't flush, so nothing goes any further than the offset writer

    statusListener.onPause(taskId);
    EasyMock.expectLastCall();

    sourceTask.stop();
    EasyMock.expectLastCall();
    expectOffsetFlush(true);

    statusListener.onShutdown(taskId);
    EasyMock.expectLastCall();

    producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
    EasyMock.expectLastCall();

    transformationChain.close();
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    Future<?> taskFuture = executor.submit(workerTask);
    assertTrue(awaitLatch(pollLatch));

    workerTask.transitionTo(TargetState.PAUSED);

    int priorCount = count.get();
    Thread.sleep(100);

    // since the transition is observed asynchronously, the count could be off by one loop iteration
    assertTrue(count.get() - priorCount <= 1);

    workerTask.stop();
    assertTrue(workerTask.awaitStop(1000));

    taskFuture.get();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:53,代码来源:WorkerSourceTaskTest.java


示例7: testFailureInPoll

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testFailureInPoll() throws Exception {
    createWorkerTask();

    sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
    EasyMock.expectLastCall();
    sourceTask.start(TASK_PROPS);
    EasyMock.expectLastCall();
    statusListener.onStartup(taskId);
    EasyMock.expectLastCall();

    final CountDownLatch pollLatch = new CountDownLatch(1);
    final RuntimeException exception = new RuntimeException();
    EasyMock.expect(sourceTask.poll()).andAnswer(new IAnswer<List<SourceRecord>>() {
        @Override
        public List<SourceRecord> answer() throws Throwable {
            pollLatch.countDown();
            throw exception;
        }
    });

    statusListener.onFailure(taskId, exception);
    EasyMock.expectLastCall();

    sourceTask.stop();
    EasyMock.expectLastCall();
    expectOffsetFlush(true);

    producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
    EasyMock.expectLastCall();

    transformationChain.close();
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    Future<?> taskFuture = executor.submit(workerTask);

    assertTrue(awaitLatch(pollLatch));
    workerTask.stop();
    assertTrue(workerTask.awaitStop(1000));

    taskFuture.get();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:48,代码来源:WorkerSourceTaskTest.java


示例8: testCommit

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testCommit() throws Exception {
    // Test that the task commits properly when prompted
    createWorkerTask();

    sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
    EasyMock.expectLastCall();
    sourceTask.start(TASK_PROPS);
    EasyMock.expectLastCall();
    statusListener.onStartup(taskId);
    EasyMock.expectLastCall();

    // We'll wait for some data, then trigger a flush
    final CountDownLatch pollLatch = expectPolls(1);
    expectOffsetFlush(true);

    sourceTask.stop();
    EasyMock.expectLastCall();
    expectOffsetFlush(true);

    statusListener.onShutdown(taskId);
    EasyMock.expectLastCall();

    producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
    EasyMock.expectLastCall();

    transformationChain.close();
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    Future<?> taskFuture = executor.submit(workerTask);

    assertTrue(awaitLatch(pollLatch));
    assertTrue(workerTask.commitOffsets());
    workerTask.stop();
    assertTrue(workerTask.awaitStop(1000));

    taskFuture.get();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:44,代码来源:WorkerSourceTaskTest.java


示例9: testCommitFailure

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testCommitFailure() throws Exception {
    // Test that the task commits properly when prompted
    createWorkerTask();

    sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
    EasyMock.expectLastCall();
    sourceTask.start(TASK_PROPS);
    EasyMock.expectLastCall();
    statusListener.onStartup(taskId);
    EasyMock.expectLastCall();

    // We'll wait for some data, then trigger a flush
    final CountDownLatch pollLatch = expectPolls(1);
    expectOffsetFlush(true);

    sourceTask.stop();
    EasyMock.expectLastCall();
    expectOffsetFlush(false);

    statusListener.onShutdown(taskId);
    EasyMock.expectLastCall();

    producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
    EasyMock.expectLastCall();

    transformationChain.close();
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    Future<?> taskFuture = executor.submit(workerTask);

    assertTrue(awaitLatch(pollLatch));
    assertTrue(workerTask.commitOffsets());
    workerTask.stop();
    assertTrue(workerTask.awaitStop(1000));

    taskFuture.get();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:44,代码来源:WorkerSourceTaskTest.java


示例10: testSlowTaskStart

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testSlowTaskStart() throws Exception {
    final CountDownLatch startupLatch = new CountDownLatch(1);
    final CountDownLatch finishStartupLatch = new CountDownLatch(1);

    createWorkerTask();

    sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
    EasyMock.expectLastCall();
    sourceTask.start(TASK_PROPS);
    EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
        @Override
        public Object answer() throws Throwable {
            startupLatch.countDown();
            assertTrue(awaitLatch(finishStartupLatch));
            return null;
        }
    });

    statusListener.onStartup(taskId);
    EasyMock.expectLastCall();

    sourceTask.stop();
    EasyMock.expectLastCall();
    expectOffsetFlush(true);

    statusListener.onShutdown(taskId);
    EasyMock.expectLastCall();

    producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
    EasyMock.expectLastCall();

    transformationChain.close();
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    Future<?> workerTaskFuture = executor.submit(workerTask);

    // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
    // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
    // cannot be invoked immediately in the thread trying to stop the task.
    assertTrue(awaitLatch(startupLatch));
    workerTask.stop();
    finishStartupLatch.countDown();
    assertTrue(workerTask.awaitStop(1000));

    workerTaskFuture.get();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:53,代码来源:WorkerSourceTaskTest.java


示例11: poll

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
protected void poll(final String packageName, TestCase testCase) throws InterruptedException, IOException {
  String keySchemaConfig = ObjectMapperFactory.INSTANCE.writeValueAsString(testCase.keySchema);
  String valueSchemaConfig = ObjectMapperFactory.INSTANCE.writeValueAsString(testCase.valueSchema);

  Map<String, String> settings = Maps.newLinkedHashMap();
  settings.put(SpoolDirSourceConnectorConfig.INPUT_PATH_CONFIG, this.inputPath.getAbsolutePath());
  settings.put(SpoolDirSourceConnectorConfig.FINISHED_PATH_CONFIG, this.finishedPath.getAbsolutePath());
  settings.put(SpoolDirSourceConnectorConfig.ERROR_PATH_CONFIG, this.errorPath.getAbsolutePath());
  settings.put(SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, String.format("^.*\\.%s", packageName));
  settings.put(SpoolDirSourceConnectorConfig.TOPIC_CONF, "testing");
  settings.put(SpoolDirSourceConnectorConfig.KEY_SCHEMA_CONF, keySchemaConfig);
  settings.put(SpoolDirSourceConnectorConfig.VALUE_SCHEMA_CONF, valueSchemaConfig);
  settings.put(SpoolDirSourceConnectorConfig.EMPTY_POLL_WAIT_MS_CONF, "10");
  settings(settings);
  if (null != testCase.settings && !testCase.settings.isEmpty()) {
    settings.putAll(testCase.settings);
  }

  this.task = createTask();

  SourceTaskContext sourceTaskContext = mock(SourceTaskContext.class);
  OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class);
  when(offsetStorageReader.offset(anyMap())).thenReturn(testCase.offset);
  when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader);
  this.task.initialize(sourceTaskContext);

  this.task.start(settings);

  String dataFile = new File(packageName, Files.getNameWithoutExtension(testCase.path.toString())) + ".data";
  log.trace("poll(String, TestCase) - dataFile={}", dataFile);

  String inputFileName = String.format("%s.%s",
      Files.getNameWithoutExtension(testCase.path.toString()),
      packageName
  );


  final File inputFile = new File(this.inputPath, inputFileName);
  log.trace("poll(String, TestCase) - inputFile = {}", inputFile);
  final File processingFile = this.task.processingFile(inputFile);
  try (InputStream inputStream = this.getClass().getResourceAsStream(dataFile)) {
    try (OutputStream outputStream = new FileOutputStream(inputFile)) {
      ByteStreams.copy(inputStream, outputStream);
    }
  }

  assertFalse(processingFile.exists(), String.format("processingFile %s should not exist before first poll().", processingFile));
  assertTrue(inputFile.exists(), String.format("inputFile %s should exist.", inputFile));
  List<SourceRecord> records = this.task.poll();
  assertTrue(inputFile.exists(), String.format("inputFile %s should exist after first poll().", inputFile));
  assertTrue(processingFile.exists(), String.format("processingFile %s should exist after first poll().", processingFile));

  assertNotNull(records, "records should not be null.");
  assertFalse(records.isEmpty(), "records should not be empty");
  assertEquals(testCase.expected.size(), records.size(), "records.size() does not match.");

  for (int i = 0; i < testCase.expected.size(); i++) {
    SourceRecord expectedRecord = testCase.expected.get(i);
    SourceRecord actualRecord = records.get(i);
    assertSourceRecord(expectedRecord, actualRecord, String.format("index:%s", i));
  }

  records = this.task.poll();
  assertTrue(records.isEmpty(), "records should be null after first poll.");
  records = this.task.poll();
  assertTrue(records.isEmpty(), "records should be null after first poll.");
  assertFalse(inputFile.exists(), String.format("inputFile %s should not exist.", inputFile));
  assertFalse(processingFile.exists(), String.format("processingFile %s should not exist.", processingFile));

  assertTrue(records.isEmpty(), "records should be empty.");

  final File finishedFile = new File(this.finishedPath, inputFileName);
  assertTrue(finishedFile.exists(), String.format("finishedFile %s should exist.", finishedFile));

}
 
开发者ID:jcustenborder,项目名称:kafka-connect-spooldir,代码行数:76,代码来源:SpoolDirSourceTaskTest.java


示例12: initialize

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
public void initialize(SourceTaskContext context) {

	super.initialize(context);
	LOG.info("AMQP source task initialized");
}
 
开发者ID:ppatierno,项目名称:kafka-connect-amqp,代码行数:7,代码来源:AmqpSourceTask.java


示例13: setUp

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
    public void setUp() {
        offsets = new HashMap<>();
        totalWrittenDocuments = 0;
        try {
            super.setUp();
            mongodStarter = MongodStarter.getDefaultInstance();
            mongodConfig = new MongodConfigBuilder()
                    .version(Version.Main.V3_2)
                    .replication(new Storage(REPLICATION_PATH, "rs0", 1024))
                    .net(new Net(12345, Network.localhostIsIPv6()))
                    .build();
            mongodExecutable = mongodStarter.prepare(mongodConfig);
            mongod = mongodExecutable.start();
            mongoClient = new MongoClient(new ServerAddress("localhost", 12345));
            MongoDatabase adminDatabase = mongoClient.getDatabase("admin");

            BasicDBObject replicaSetSetting = new BasicDBObject();
            replicaSetSetting.put("_id", "rs0");
            BasicDBList members = new BasicDBList();
            DBObject host = new BasicDBObject();
            host.put("_id", 0);
            host.put("host", "127.0.0.1:12345");
            members.add(host);
            replicaSetSetting.put("members", members);
            adminDatabase.runCommand(new BasicDBObject("isMaster", 1));
            adminDatabase.runCommand(new BasicDBObject("replSetInitiate", replicaSetSetting));
            MongoDatabase db = mongoClient.getDatabase("mydb");
            db.createCollection("test1");
            db.createCollection("test2");
            db.createCollection("test3");
        } catch (Exception e) {
//                Assert.assertTrue(false);
        }

        task = new MongodbSourceTask();

        offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
        context = PowerMock.createMock(SourceTaskContext.class);
        task.initialize(context);

        sourceProperties = new HashMap<>();
        sourceProperties.put("uri", "mongodb://localhost:12345");
        sourceProperties.put("batch.size", Integer.toString(100));
        sourceProperties.put("schema.name", "schema");
        sourceProperties.put("topic.prefix", "prefix");
        sourceProperties.put("databases", "mydb.test1,mydb.test2,mydb.test3");

    }
 
开发者ID:DataReply,项目名称:kafka-connect-mongodb,代码行数:50,代码来源:MongodbSourceUriTaskTest.java


示例14: setUp

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
    public void setUp() {
        offsets = new HashMap<>();
        totalWrittenDocuments = 0;
        try {
            super.setUp();
            mongodStarter = MongodStarter.getDefaultInstance();
            mongodConfig = new MongodConfigBuilder()
                    .version(Version.Main.V3_2)
                    .replication(new Storage(REPLICATION_PATH, "rs0", 1024))
                    .net(new Net(12345, Network.localhostIsIPv6()))
                    .build();
            mongodExecutable = mongodStarter.prepare(mongodConfig);
            mongod = mongodExecutable.start();
            mongoClient = new MongoClient(new ServerAddress("localhost", 12345));
            MongoDatabase adminDatabase = mongoClient.getDatabase("admin");

            BasicDBObject replicaSetSetting = new BasicDBObject();
            replicaSetSetting.put("_id", "rs0");
            BasicDBList members = new BasicDBList();
            DBObject host = new BasicDBObject();
            host.put("_id", 0);
            host.put("host", "127.0.0.1:12345");
            members.add(host);
            replicaSetSetting.put("members", members);
            adminDatabase.runCommand(new BasicDBObject("isMaster", 1));
            adminDatabase.runCommand(new BasicDBObject("replSetInitiate", replicaSetSetting));
            MongoDatabase db = mongoClient.getDatabase("mydb");
            db.createCollection("test1");
            db.createCollection("test2");
            db.createCollection("test3");
        } catch (Exception e) {
//                Assert.assertTrue(false);
        }

        task = new MongodbSourceTask();

        offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
        context = PowerMock.createMock(SourceTaskContext.class);
        task.initialize(context);

        sourceProperties = new HashMap<>();
        sourceProperties.put("host", "localhost");
        sourceProperties.put("port", Integer.toString(12345));
        sourceProperties.put("batch.size", Integer.toString(100));
        sourceProperties.put("schema.name", "schema");
        sourceProperties.put("topic.prefix", "prefix");
        sourceProperties.put("databases", "mydb.test1,mydb.test2,mydb.test3");

    }
 
开发者ID:DataReply,项目名称:kafka-connect-mongodb,代码行数:51,代码来源:MongodbSourceTaskTest.java


示例15: initialize

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
public void initialize(SourceTaskContext context) {
    super.initialize(context);
}
 
开发者ID:yaravind,项目名称:kafka-connect-jenkins,代码行数:5,代码来源:JenkinsSourceTask.java


示例16: testSourceTaskProcessAll

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testSourceTaskProcessAll() {
    logger.info (
        "Setting up stubs for clean run with no published messages"
    );
    
    SourceTaskContext context = EasyMock.createMock(
        SourceTaskContext.class
    );
    
    OffsetStorageReader offsetStorageReader = EasyMock.createMock (
        OffsetStorageReader.class
    );
    
    EasyMock.expect(context.offsetStorageReader())
        .andReturn(offsetStorageReader);
    
    EasyMock.expect(offsetStorageReader.offsets(offsetPartitions()))
        .andReturn(new HashMap<Map<String, String>, Map<String, Object>>());
    
    EasyMock.checkOrder(context, false);
    EasyMock.replay(context);
    
    EasyMock.checkOrder(offsetStorageReader, false);
    EasyMock.replay(offsetStorageReader);
    
    logger.info ("Source task poll all - test");
    
    ReplicateSourceTask sourceTask = new ReplicateSourceTask();
    
    sourceTask.initialize(context);
    
    try {
        sourceTask.start(getConfigPropsForSourceTask());
        
        assertEquals (
            "Expecting no record for first poll, nothing in PLOG", 
            null, 
            sourceTask.poll()
        );
        
        /* expect 2 messages per PLOG, 3 PLOGs with data */
        List<SourceRecord> records = sourceTask.poll();
        validateRecordCount (records.size());
        validateTopics (records);
        
        records = sourceTask.poll();
        validateRecordCount (records.size());
        validateTopics (records);
        
        records = sourceTask.poll();
        validateRecordCount (records.size());
        validateTopics (records);
    }
    catch (InterruptedException ie) {
        logger.info ("Stopping task");
    }
    catch (Exception e) {
        e.printStackTrace();
        fail (e.getMessage());
    }
    finally {
        sourceTask.stop();
    }
}
 
开发者ID:dbvisitsoftware,项目名称:replicate-connector-for-kafka,代码行数:66,代码来源:ReplicateSourceTaskTest.java


示例17: testSourceTaskProcessTwoMessagesBySCN

import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testSourceTaskProcessTwoMessagesBySCN() {
    logger.info (
        "Setting up stubs using global SCN filter for fake cold start"
    );
    
    SourceTaskContext context = EasyMock.createMock(
        SourceTaskContext.class
    );
    
    OffsetStorageReader offsetStorageReader = EasyMock.createMock (
        OffsetStorageReader.class
    );
    
    EasyMock.expect(context.offsetStorageReader())
        .andReturn(offsetStorageReader);
    
    EasyMock.expect(offsetStorageReader.offsets(offsetPartitions()))
        .andReturn(new HashMap<Map<String, String>, Map<String, Object>>());
    
    EasyMock.checkOrder(context, false);
    EasyMock.replay(context);
    
    EasyMock.checkOrder(offsetStorageReader, false);
    EasyMock.replay(offsetStorageReader);
    
    logger.info ("Source task poll all committed, no new data");
    
    ReplicateSourceTask sourceTask = new ReplicateSourceTask();
    
    sourceTask.initialize(context);
    
    sourceTask.start(
        getConfigPropsForSourceTaskWithStartSCN(PLOG_22_TRANSACTION_SCN)
    );
    
    try {
        /* expect no new messages to publish for first poll */
        List<SourceRecord> records = sourceTask.poll();
        assertNull (
            "Expecting no records for first PLOG 20, contains no data",
            records
        );
        
        records = sourceTask.poll();
        assertNull (
            "Expecting no records for second PLOG",
            records
        );
        
        /* expect 2 messages for last two PLOG */
        records = sourceTask.poll();
        validateRecordCount (records.size());
        validateTopics (records);
        validateRecordOffset(records, PLOG_21_TRANSACTION_OFFSET);
        
        records = sourceTask.poll();
        validateRecordCount (records.size());
        validateTopics (records);
        validateRecordOffset(records, PLOG_22_TRANSACTION_OFFSET);
        
        /* next call will block because there is no further data */
    }
    catch (InterruptedException ie) {
        logger.info ("Stopping task");
    }
    catch (Exception e) {
        e.printStackTrace();
        fail (e.getMessage());
    }
    finally {
        sourceTask.stop();
    }
}
 
开发者ID:dbvisitsoftware,项目名称:replicate-connector-for-kafka,代码行数:75,代码来源:ReplicateSourceTaskTest.java



注:本文中的org.apache.kafka.connect.source.SourceTaskContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Whirlpool0类代码示例发布时间:2022-05-22
下一篇:
Java Tree类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap