// DruidDataSource 的内部类,对应主流程3,用来补充连接
public class CreateConnectionThread extends Thread {
public CreateConnectionThread(String name){
// 重置线程名称
super(name);
// 标记为守护线程
this.setDaemon(true);
}
// run 方法
public void run() {
// 通知 init(主流程2)自己已经启动成功
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
// 死循环
for (;;) {
// addLast
try {
// 锁获取
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
// 当前丢弃连接数与最后一次丢弃连接数的差值大于 0,说明又发生了丢弃连接的现象,该条件会促进连接的创建
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
boolean emptyWait = true;
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
// 不需要创建连接时,阻塞(挂起)
empty.await();
}
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
// 超出限制依然挂起,不再新增连接
empty.await();
continue;
}
}
} catch (InterruptedException e) {
lastCreateError = e;
lastErrorTimeMillis = System.currentTimeMillis();
if ((!closing) && (!closed)) {
LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
}
break;
} finally {
// 锁释放
lock.unlock();
}
// 从上面的程序走到这里,说明该线程被成功唤起,则进行新建连接
PhysicalConnectionInfo connection = null;
try {
// 利用驱动程序新建物理连接
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {
// 新建失败后再次尝试
continue;
}
// 尝试放入池子
boolean result = put(connection);
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0; // reset errorCount
if (closing || closed) {
break;
}
}
}
}
// 这一个 put 方法是上面触发接收 PhysicalConnectionInfo 类型连接用的,之前说过,最终保存在池子里的连接对象都是 DruidConnectionHolder 类型,所以这里时进行一次包装,然后真正 put 进去的是更下面的 put 方法
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
// 包装成 holder 类型
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
lock.lock();
try {
if (createScheduler != null) {
clearCreateTask(physicalConnectionInfo.createTaskId);
}
} finally {
lock.unlock();
}
LOG.error("create connection holder error", ex);
return false;
}
// 真正放入池子
return put(holder, physicalConnectionInfo.createTaskId, false);
}
// 真正将连接对象放入池子
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
lock.lock();
try {
if (this.closing || this.closed) {
// 如果此时发现当前池子里的闲置连接数已经超过了 maxActive,那么就不再往里面加了
return false;
}
if (poolingCount >= maxActive) {
if (createScheduler != null) {
clearCreateTask(createTaskId);
}
return false;
}
if (checkExists) {
for (int i = 0; i < poolingCount; i++) {
if (connections[i] == holder) {
return false;
}
}
}
// 加在数组尾部
connections[poolingCount] = holder;
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
// 唤起一个因为拿不到连接对象而发生阻塞的业务线程,让其再次进入运行状态,进行获取连接竞争
notEmpty.signal();
notEmptySignalCount++;
// 模式未启用
if (createScheduler != null) {
clearCreateTask(createTaskId);
if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}