调用堆栈
io.vertx.ext.mongo.impl.MongoClientImpl;
io.vertx.ext.mongo.impl.MongoHolder;
io.vertx.ext.mongo.impl.config.MongoClientOptionsParser;
实现过程
当调用MongoClient::createShared()或MongoClient::createNonShared()方法创建mongo的客户端时,最终都会调用到MongoClientImpl的构造函数。
public MongoClientImpl(Vertx vertx, JsonObject config, String dataSourceName) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(config);
Objects.requireNonNull(dataSourceName);
this.vertx = vertx;
// 检查或创建新的MongHolder
this.holder = lookupHolder(dataSourceName, config);
this.mongo = holder.mongo();
this.useObjectId = config.getBoolean("useObjectId", false);
}
如果是通过createNonShared方法创建client时,这里传入的dataSourceName是一个UUID。当使用createShared创建client,会在lookupHolder方法中检查是否已经创建了同名的客户端,否则新建。
下图是检查数据源的过程。会根据传入的 datasourceName 从 vertx 实例的共享数据实例(io.vertx.core.shareddata.SharedData)中获取同名的 MongoHolder 实例。
private MongoHolder lookupHolder(String datasourceName, JsonObject config) {
synchronized (vertx) {
// 获取共享数据实例中的map
LocalMap<String, MongoHolder> map = vertx.sharedData().getLocalMap(DS_LOCAL_MAP_NAME);
// 检查datasourceName对应的MongoHolder 是否存在
MongoHolder theHolder = map.get(datasourceName);
// 不存在则新构建,并将构建的结果放入sharedData的map中
if (theHolder == null) {
theHolder = new MongoHolder(config, () -> removeFromMap(map, datasourceName));
map.put(datasourceName, theHolder);
} else {
// 递增被引用的计数
theHolder.incRefCount();
}
return theHolder;
}
}
如果实例不存在,则会创建新的 MongoHolder 实例。个人认为这里有个很不完美的地方是为了解决懒汉模式的问题,增加了一个线程锁。在高并发请求数据库连接资源时,这里会有阻塞。因此我在自己的实现类中存储了MongoClient的实例。不过这个线程锁可以有效减少数据库连接池的爆发式增长,在数据库连接池资源较少的情况下,有不错的效果(比如我们某个项目使用了阿里云的mongDB,最低配置只有200个连接)。
下图是 MongoHolder 的构造方法。
public MongoHolder(JsonObject config, Runnable closeRunner) {
this.config = config;
this.closeRunner = closeRunner;
}
MongoHolder 构造方法只是简单的设置了成员变量 config 和 closeRunner 的值。closeRunner当调用MongoClient::close()方法时,用于回调销毁SharedData::localMap中的对应索引数据。config 是用户传入的配置参数,需要注意的是,config 传递到这里一直没有被改变。
创建 MongoHolder 的实例成功后,接下来会调用 MongoHolder::mongo() 来创建一个真正 com.mongodb.async.client.MongoClient 实例。这里同样使用了懒汉模式,存在线程锁,如果MongoClient的实例存在直接返回,如果不存在,则新建一个实例。
synchronized com.mongodb.async.client.MongoClient mongo() {
if (mongo == null) {
// 解析外部传递的config
MongoClientOptionsParser parser = new MongoClientOptionsParser(config);
// 将解析结果用于创建新的com.mongodb.async.client.MongoClient实例
mongo = MongoClients.create(parser.settings());
String dbName = config.getString("db_name", DEFAULT_DB_NAME);
db = mongo.getDatabase(dbName);
}
return mongo;
}
MongoClientOptionsParser 对象是创建Mongo客户端的关键,他会解析用户传递的参数来创建mongDB客户端,理解他的解析方法有利于创建合适的客户端。
MongoClientOptionsParser 的构造方法共有60行,这里分几段说明。为了更好的理解创建过程,建议了解下MongoDB异步Java驱动。下面的构造客户端参数的第一部分。
public MongoClientOptionsParser(JsonObject config) {
Objects.requireNonNull(config);
// 创建mongoDB的构建对象
MongoClientSettings.Builder options = MongoClientSettings.builder();
// 注册对象数据的存储规则
options.codecRegistry(CodecRegistries.fromRegistries(commonCodecRegistry, CodecRegistries.fromCodecs(new JsonObjectCodec(config))));
// 获取连接串,所有的定义参数都来自连接串
String cs = config.getString("connection_string");
// 解析连接串
ConnectionString connectionString = (cs == null) ? null : new ConnectionString(cs);
// 解析集群参数
ClusterSettings clusterSettings = new ClusterSettingsParser(connectionString, config).settings();
options.clusterSettings(clusterSettings);
// 解析连接池参数
ConnectionPoolSettings connectionPoolSettings = new ConnectionPoolSettingsParser(connectionString, config).settings();
options.connectionPoolSettings(connectionPoolSettings);
// some code
}
首先创建 MongoClientSettings 的构造对象。
然后根据传递的参数构建 CodecRegistry 实例。CodecRegistry 的说明见 mongDB官网CodecRegistry的API说明。CodecRegistry 用于指定相关的对象在mongoDB的读写实现类,例如官方已经源生实现了 StringCodec、IntegerCodec来处理Java的String、Integer对象。
这段代码的最后部分,创建一个 ConnectionString 实例来分解和存储连接串的解析结果。ConnectionString 是 mongoDB 官方实现的解析连接串参数方法。可以将http协议串解析成对应的初始化参数,例如设置连接池最小连接数为20,最大连接数为200: mongodb://host:27017/?minPoolSize=20&maxPoolSize=200 。详细说明见 ConnectionString 的API文档 和 mongoDB官方指引手册 。
下面的代码是 ClusterSettingsParser 对传入的数据进行解析,vertx-mongdb解析连接参数都是采用类似的思路:优先使用mongodb源生连接串中指定的参数,如果参数不存在,则使用用户传入的参数。因此,在我们设计mongodb的连接参数时,可以在传入的JsonObject实例中统一在key="connection_string"的参数中一次性制定mongdb风格的连接字符串,还可以在这个实例中通过key值设置vertx风格的各种连接参数。如果2个参数都存在,则优先使用连接字符串。
public ClusterSettingsParser(ConnectionString connectionString, JsonObject config) {
// 创建mongdb集群builder方法
ClusterSettings.Builder settings = ClusterSettings.builder();
// 优先从连接字符串中使用mongdb源生方法解析相关参数
if (connectionString != null) {
settings.applyConnectionString(connectionString);
} else {
// 如果连接字符串中相关的参数不存在,则从用户传入的config中提取指定的数据
// 设置host列表
// 在parseHosts中优先解析config是否存在包含key=hosts的JsonArray实例,如果有则会即系多个连接服务器
// 如果没有key=hosts,则解析host和port是否存在
List<ServerAddress> hosts = parseHosts(config);
settings.hosts(hosts);
// 设置mongdb的运行模式和replica模式
String replicaSet = config.getString("replicaSet");
if (hosts.size() == 1 && replicaSet == null) {
settings.mode(ClusterConnectionMode.SINGLE);
} else {
settings.mode(ClusterConnectionMode.MULTIPLE);
}
if (replicaSet != null) {
settings.requiredReplicaSetName(replicaSet);
}
}
this.settings = settings.build();
}
这里就不一一说明每一个解析方法,基本上都是一样的套路。
解析完连接参数后,用这些参数直接调用MongoClients::create来创建mongdb的客户端实例。然后从客户端从获取mongodb的连接。
总结
至此,mongdb的创建过程完毕。在创建的过程中,可以实现mongdb源生的连接串,也可以使用vertx风格的JsonObject。mongdb自身已经实现了全异步接口,因此vertx-mongdb只是在此基础上进行了一层封装。下面的附表是vertx-mongdb相关的设置参数。可以在建立vertx-mongdb实例时,通过JsonObject传入。
{
// 设置单个mongdb服务时使用host、port指定主机和端口
"host" : "17.0.0.1", // string --mongdb实例所在的地址
"port" : 27017, // int --mongdb实例的端口号
// 设置集群mongdb服务器时使用队列
"hosts" : [
{
"host" : "cluster1", // string --集群1地址
"port" : 27000 // int --集群1端口号
},
{
"host" : "cluster2", // string --集群2地址
"port" : 28000 // int --集群2端口号
},
...
],
// 数据库分布式方法
"replicaSet" : "foo" // string
// 连接池参数
"maxPoolSize" : 100, // int --最大连接数
"minPoolSize" : 0, // int --最小连接数
"maxIdleTimeMS" : 0, // long --单个连接空闲释放时间,0时表示没有时间限制
"maxLifeTimeMS" : 0, // long --单个连接最大存活时间,0时表示灭有时间限制
"waitQueueMultiple" : 500, // int --等待获取连接的排队队列最大数量。
"waitQueueTimeoutMS" : 120000, // long --等待获取连接的最大等待时间。
"maintenanceFrequencyMS" : 0, // long
"maintenanceInitialDelayMS" : 0, // long
// 账户、密码、连接信息
"username" : "john", // string
"password" : "passw0rd", // string
"authSource" : "some.db" // string
"authMechanism" : "GSSAPI", // string --认证机制相关配置,详情见http://docs.mongodb.org/manual/core/authentication/
"gssapiServiceName" : "myservicename", // string --Kerberos单点登录相关接口API配置。
// 联网相关的配置
"connectTimeoutMS" : 10000 , // int // --连接到mongdb数据库实例返回的等待时间
"socketTimeoutMS" : 0, // int // --通过socket完成数据库相关操作的等待与返回时间,0时表示没有限制。
"sendBufferSize" : 0, // int // --设置通过socket发送数据的缓存大小,0时表示使用操作系统默认值。
"receiveBufferSize" : 0, // int --设置通过socket获取数据的缓存大小,0时表示使用操作系统默认值。
"keepAlive" : false // boolean --设置是否保持数据库连接,默认为false
// 设置集群之间的心跳配置
"heartbeat.socket" : {
"connectTimeoutMS" : 300000, // int
"socketTimeoutMS" : 100000, // int
"sendBufferSize" : 8192, // int
"receiveBufferSize" : 8192, // int
"keepAlive" : true // boolean
}
// 设置客户端和mongdb实例的心跳测试
"heartbeatFrequencyMS" : 5000 // long 集群监视器监控到达每个mongdb实例的心跳频率
"minHeartbeatFrequencyMS" : 1000 // long 当前客户端到服务器的监控频率
}