12 管理协议的法定人对象QuorumPeer创建与初始化
12 .1 简介
在前面我们说到QuorumPeerMain使用配置运行的时候 <<10-使用配置信息启动运行入口>>
相关代码如下:
//节点创建与配置初始化
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (quorumPeer.isQuorumSaslAuthEnabled()) {
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
//节点初始化
quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}
//节点启动
quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
//节点加入集群
quorumPeer.join();
对应QuorumPeerMain中调用的getQuorumPeer()方法创建QuorumPeer类型对象的代码如下:
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer();
}
这个QuorumPeer类就是Zookeeper的Leader选举的启动类,负责创建选举算法,zk数据恢复,启动leader选举等。通过这里就可以看到这个QuorumPeer相当重要相当于启动了一个选举角色的节点
Zookeeper使用Zookeeper Atomic Broadcast(Zookeeper 原子广播协议) 简称ZAB协议进行投票,选举出一个leader节点来进行处理数据.
QuorumPeer这个类管理仲裁协议, 该服务器可以处于三种状态:
Leader election—每个服务器将选举一个领袖(最初提议自己是一个领袖)。
Follower -服务器将与leader同步并复制任何事务。
Leader—服务器将处理请求并将其转发给追随者。大多数关注者必须在请求被接受之前将其记录下来。
这个类将设置一个数据报套接字,该套接字将始终响应其当前leader的视图。响应将采取以下形式:
int xid;
long myid;
long leader_id;
long leader_zxid;
对当前leader的请求将只包含一个xid: int xid;
QuorumPeer类型继承了ZooKeeperThread,而ZookeeperThread类型继承了Thread
也就是说QuorumPeer其实是一个线程类型
接下来看下创建QuorumPeer对象会做哪些初始化操作:
public QuorumPeer() throws SaslException {
super("QuorumPeer");
//创建QuorumStats来提供Quorum的状态
quorumStats = new QuorumStats(this);
//记录所有的远程仲裁节点信息RemotePeerBean
jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
//根据配置决定是否开启管理服务,系统使用jetty提供管理服务,默认提供8080端口访问
adminServer = AdminServerFactory.createAdminServer();
//创建 X509 用于安全连接的证书加密验证工具类
x509Util = createX509Util();
//根据是否需要加密连接创建认证对象
initialize();
//动态配置是否赋值
reconfigEnabled = QuorumPeerConfig.isReconfigEnabled();
}
构造器一般用于一些基础对象的初始化比较简单.
11.2 管理协议的法定人对象QuorumPeer 启动
创建完QuorumPeer对象后就是将参数中的配置初始化到QuorumPeer对象中。看后面启动的时候我们可以了解下这个类的继承关系,QuorumPeer继承了ZooKeeperThread
而ZooKeeperThread继承了Thread,其实这是一个线程类,接下来我们就看下这类的启动:
管理协议的法定人对象QuorumPeer启动方法
//重写了线程的start方法 start为代理方法在启动之前做一些逻辑处理
quorumPeer.start();
在QuorumPeer类型中 重写了start方法,启动线程的调用被放在了start方法的最后,启动之前,这种代理方法策略一般用于我们对某些操作逻辑的增强逻辑类似AOP机制 , start操作如下代码:
@Override
public synchronized void start() {
//getView是当前初始化的法定人对象 列表
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//将数据从磁盘加载到内存中,并将事务添加到内存中的committedlog中。 读取快照和事物文件
loadDataBase();
//服务端开启连接线程
startServerCnxnFactory();
try {
//开启管理端
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//开启选举功能
startLeaderElection();
//开启JVM监控
startJvmPauseMonitor();
//线程启动QuorumPeer开始运行
super.start();
}
启动逻辑
正常情况下我们调用线程类的start方法,线程类的start方法调用native类型的start0方法来创建线程,QuorumPeer重写了start方法在线程启动之前执行了一些初始化操作
QuorumPeer类型中 重写了start方法主要步骤如下:
- 将数据从磁盘加载到内存中,并将事务添加到内存中的committedlog中。
- 服务端开启连接线程
- 开启管理端
- 开启选举功能
- 开启JVM监控
- QuorumPeer线程启动,开始进行逻辑处理
查看原文,技术咨询与支持,可以扫描微信公众号进行回复咨询
Zookeeper16, Zookeeper安装配置14, Zookeeper源码解析16