• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java CanalConnectors类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.alibaba.otter.canal.client.CanalConnectors的典型用法代码示例。如果您正苦于以下问题:Java CanalConnectors类的具体用法?Java CanalConnectors怎么用?Java CanalConnectors使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



CanalConnectors类属于com.alibaba.otter.canal.client包,在下文中一共展示了CanalConnectors类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: initCanalStart

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
public void initCanalStart() {
    List<String> destinations = canalProperties.getDestination();
    final List<CanalClient> canalClientList = new ArrayList<>();
    if (destinations != null && destinations.size() > 0) {
        for (String destination : destinations) {
            // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
            CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
            CanalClient client = new CanalClient(destination, connector);
            canalClientList.add(client);
            client.start();
        }
    }
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            try {
                logger.info("## stop the canal client");
                for (CanalClient canalClient : canalClientList) {
                    canalClient.stop();
                }
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }

    });
}
 
开发者ID:zhangtr,项目名称:canal-mongo,代码行数:29,代码来源:CanalInitHandler.java


示例2: CanalEmbedSelector

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
public CanalEmbedSelector(CanalConf conf) {

        logger.info("TotoroSelector init start  , conf :{}", conf.toString());

        this.mode = conf.getMode();
        this.destination = conf.getDestination();
        this.filterPatten = conf.getFilterPatten();

        String userName = conf.getUserName();
        String passWord = conf.getPassWord();

        if (Mode.SIGN.equals(mode)) {
            String address = conf.getAddress();

            String[] hostPort = address.split(":");

            String ip = hostPort[0];
            Integer port = Integer.valueOf(hostPort[1]);

            SocketAddress socketAddress = new InetSocketAddress(ip, port);

            connector = CanalConnectors.newSingleConnector(socketAddress,
                    destination,
                    userName,
                    passWord);

        } else if (Mode.CLUSTER.equals(mode)) {
            String zkAddress = conf.getZkAddress();
            connector = CanalConnectors.newClusterConnector(zkAddress, destination, userName, passWord);
        } else {
            throw new TotoroException("Invalid mode");
        }
        logger.info("TotoroSelector init complete .......");
    }
 
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:35,代码来源:CanalEmbedSelector.java


示例3: main

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "totoro",
                "",
                "");

        connector.connect();
        connector.subscribe();
        int emptyTimes = 0;

        while (running) {
            Message message = connector.getWithoutAck(5 * 1024);

            if (message == null || message.getId() == -1L) {
                applyWait(emptyTimes++);
            } else {
                //logger.info(message.toString());
                long messageId = message.getId();
                System.out.println("消息id:" + messageId);
                Thread.sleep(1000);

                connector.rollback();
            }


        }

    }
 
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:30,代码来源:LockTest.java


示例4: init

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
/**
 * @description Canal服务配置
 * @author yi.zhang
 * @time 2017年4月19日 上午10:38:42
 * @throws Exception
 */
public void init(String destination,String servers,String username,String password,String filter_regex,boolean isZookeeper,Integer batch_size){
	try {
		if(filter_regex!=null){
			CANAL_FILTER_REGEX = filter_regex;
		}
		if(batch_size!=null){
			BATCH_SIZE = batch_size;
		}
		if(isZookeeper){
			connector = CanalConnectors.newClusterConnector(servers, destination, username, password);
		}else{
			List<SocketAddress> addresses = new ArrayList<SocketAddress>();
			for(String address : servers.split(",")){
				String[] ips = address.split(":");
				String ip = ips[0];
				int port=11111;
				if(ips.length>1){
					port = Integer.valueOf(ips[1]);
				}
				addresses.add(new InetSocketAddress(ip, port));
			}
			connector = CanalConnectors.newClusterConnector(addresses, destination, username, password);
		}
		connector.connect();
		connector.subscribe(CANAL_FILTER_REGEX);
		connector.rollback();
	} catch (Exception e) {
		logger.error("-----Canal Config init Error-----", e);
	}
}
 
开发者ID:dev-share,项目名称:database-transform-tool,代码行数:37,代码来源:CanalFactory.java


示例5: initConector

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
public void initConector() {
    if (canalConfig == null) {
        throw new IllegalArgumentException("CustomSimpleCanalClient canalConfig property is empty!");
    }
    if ((invokeMap == null || invokeMap.isEmpty())&& this.globalInvoke==null) {
        throw new IllegalArgumentException("CustomSimpleCanalClient invokeMap property is empty!");
    }
    if (canalConfig instanceof SingleCanalConfig) {
        String socketStr = ((SingleCanalConfig) canalConfig).getSocketAddress();
        logger.info("canal will connection to {}.", socketStr);
        connector = CanalConnectors.newSingleConnector(this.getSocketAddressByString(socketStr),
                canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    } else if (canalConfig instanceof SocketsClusterCanalConfig) {
        List<SocketAddress> socketAddressList = new ArrayList<SocketAddress>();
        for (String sok : ((SocketsClusterCanalConfig) canalConfig).getSocketAddress()) {
            logger.info("canal will connection to {}.", sok);
            socketAddressList.add(this.getSocketAddressByString(sok));
        }
        connector = CanalConnectors.newClusterConnector(socketAddressList,
                canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    } else if (canalConfig instanceof ZkClusterCanalConfig) {
        String zkAddress = ((ZkClusterCanalConfig) canalConfig).getZkAddress();
        logger.info("canal will connection to {}.", zkAddress);
        connector = CanalConnectors.newClusterConnector(zkAddress,
                canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    }
    connector.connect();
    connector.subscribe(canalConfig.getSubscribeChannel());
    connector.rollback();
}
 
开发者ID:fanqinghui,项目名称:canal-client,代码行数:31,代码来源:CustomSimpleCanalClient.java


示例6: getCanalConnector

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
@Bean
public CanalConnector getCanalConnector() {
    canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(canalHost, Integer.valueOf(canalPort))), canalDestination, canalUsername, canalPassword);
    canalConnector.connect();
    // 指定filter,格式 {database}.{table},这里不做过滤,过滤操作留给用户
    canalConnector.subscribe();
    // 回滚寻找上次中断的位置
    canalConnector.rollback();
    logger.info("canal客户端启动成功");
    return canalConnector;
}
 
开发者ID:starcwang,项目名称:canal_mysql_elasticsearch_sync,代码行数:12,代码来源:CanalClient.java


示例7: main

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
public static void main(String args[]) {
    String destination = "example";

    // 基于固定canal server的地址,建立链接,其中一台server发生crash,可以支持failover
    // CanalConnector connector = CanalConnectors.newClusterConnector(
    // Arrays.asList(new InetSocketAddress(
    // AddressUtils.getHostIp(),
    // 11111)),
    // "stability_test", "", "");

    // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
    CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");

    final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);
    clientTest.setConnector(connector);
    clientTest.start();

    Runtime.getRuntime().addShutdownHook(new Thread() {

        public void run() {
            try {
                logger.info("## stop the canal client");
                clientTest.stop();
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }

    });
}
 
开发者ID:alibaba,项目名称:canal,代码行数:33,代码来源:ClusterCanalClientTest.java


示例8: main

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
public static void main(String args[]) {
    // 根据ip,直接创建链接,无HA的功能
    String destination = "example";
    String ip = AddressUtils.getHostIp();
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
        destination,
        "",
        "");

    final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
    clientTest.setConnector(connector);
    clientTest.start();
    Runtime.getRuntime().addShutdownHook(new Thread() {

        public void run() {
            try {
                logger.info("## stop the canal client");
                clientTest.stop();
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }

    });
}
 
开发者ID:alibaba,项目名称:canal,代码行数:28,代码来源:SimpleCanalClientTest.java


示例9: getConnector

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
private CanalConnector getConnector(String zkServers, String destination, String username, String password) {
    return CanalConnectors.newClusterConnector(zkServers, destination, username, password);
}
 
开发者ID:lackhurt,项目名称:flume-canal-source,代码行数:4,代码来源:CanalClient.java


示例10: init

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
/**
 * @description Canal服务配置
 * @author yi.zhang
 * @time 2017年4月19日 上午10:38:42
 * @throws Exception
 */
public void init(String destinations,String servers,String username,String password,String filter_regex,boolean isZookeeper,Integer batch_size){
	try {
		if(filter_regex!=null){
			CANAL_FILTER_REGEX = filter_regex;
		}
		if(batch_size!=null){
			BATCH_SIZE = batch_size;
		}
		if(servers==null||"".equals(servers)){
			return;
		}
		if(destinations!=null&&!"".equals(destinations)){
			for(String destination:destinations.split(",")){
				if(destination==null||"".equals(destination)){
					continue;
				}
				CanalConnector connector = null;
				if(isZookeeper){
					connector = CanalConnectors.newClusterConnector(servers, destination, username, password);
				}else{
					List<SocketAddress> addresses = new ArrayList<SocketAddress>();
					for(String address : servers.split(",")){
						String[] ips = address.split(":");
						String ip = ips[0];
						int port=11111;
						if(ips.length>1){
							port = Integer.valueOf(ips[1]);
						}
						addresses.add(new InetSocketAddress(ip, port));
					}
					if(addresses!=null&&addresses.size()==1){
						connector = CanalConnectors.newSingleConnector(addresses.get(0), destination, username, password);
					}else{
						connector = CanalConnectors.newClusterConnector(addresses, destination, username, password);
					}
				}
				connector.connect();
		        connector.subscribe(CANAL_FILTER_REGEX);
		        connector.rollback();
		        cache.put(destination, connector);
			}
		}
	} catch (Exception e) {
		logger.error("-----Muti Canal Config init Error-----", e);
	}
}
 
开发者ID:dev-share,项目名称:database-transform-tool,代码行数:53,代码来源:MutiCanalFactory.java


示例11: create

import com.alibaba.otter.canal.client.CanalConnectors; //导入依赖的package包/类
@Override
public CanalConnector create(String instanceName) {
    return CanalConnectors.newSingleConnector(address, instanceName, null, null);
}
 
开发者ID:wxingyl,项目名称:search-commons,代码行数:5,代码来源:Schemas.java



注:本文中的com.alibaba.otter.canal.client.CanalConnectors类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java LatencyMetrics类代码示例发布时间:2022-05-22
下一篇:
Java OpTransferBlockProto类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap