private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
// 可用性判断
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
}
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
// 纳秒
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
// 默认不开启(-1),表示取不到连接发生等待时阻塞的最大业务线程数,
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
for (boolean createDirect = false;;) {
// 模式未启用,恒等 false,下面的逻辑不会触发,所以为了方便阅读,隐藏这部分代码
if (createDirect) {
// 代码隐藏
}
try {
// 锁获取
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
// 如果因为拿不到连接而阻塞的业务线程数达到阈值,则直接抛异常
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount ")
.append(activeCount)
.append(", onFatalErrorMaxActive ")
.append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) {
errorMsg.append(", time '")
.append(StringUtils.formatDateTime19(
lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
}
if (lastFatalErrorSql != null) {
errorMsg.append(", sql \n")
.append(lastFatalErrorSql);
}
throw new SQLException(
errorMsg.toString(), lastFatalError);
}
// 连接数累加
connectCount++;
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
// createScheduler 这种异步添加模式不开启(默认不开启,本文也不是基于该模式的),createDirect 永远不等于 true,所以上面 createDirect==true 的代码不会被触发
createDirect = true;
continue;
}
}
if (maxWait > 0) {
// 尝试从池子里获取连接
holder = pollLast(nanos);
} else {
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
// 拿到连接,activeCount 累加
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}
break;
}
// 没有获取到连接,整理错误信息,抛出错误
if (holder == null) {
long waitNanos = waitNanosLocal.get();
final long activeCount;
final long maxActive;
final long creatingCount;
final long createStartNanos;
final long createErrorCount;
final Throwable createError;
try {
lock.lock();
activeCount = this.activeCount;
maxActive = this.maxActive;
creatingCount = this.creatingCount;
createStartNanos = this.createStartNanos;
createErrorCount = this.createErrorCount;
createError = this.createError;
} finally {
lock.unlock();
}
StringBuilder buf = new StringBuilder(128);
buf.append("wait millis ")//
.append(waitNanos / (1000 * 1000))//
.append(", active ").append(activeCount)//
.append(", maxActive ").append(maxActive)//
.append(", creating ").append(creatingCount)//
;
if (creatingCount > 0 && createStartNanos > 0) {
long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
if (createElapseMillis > 0) {
buf.append(", createElapseMillis ").append(createElapseMillis);
}
}
if (createErrorCount > 0) {
buf.append(", createErrorCount ").append(createErrorCount);
}
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) {
buf.append('\n');
} else {
buf.append(", ");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ").append(sql.getRunningCount());
buf.append(" : ");
buf.append(sql.getSql());
}
String errorMessage = buf.toString();
if (createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
// 包装成目标对象
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
// 返回
return poolalbeConnection;
}
// 尝试从池子里获取连接
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (;;) {
// 池子里的空闲连接为 0,说明需要通知主流程3新增连接了
if (poolingCount == 0) {
// empty.signal,唤起主流程3新增连接
emptySignal(); // send signal to CreateThread create connection
//如果设置为快速结束,则不阻塞业务线程,直接抛出异常
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
if (estimate <= 0) {
waitNanosLocal.set(nanos - estimate);
return null;
}
// 因为获取不到连接而陷入阻塞状态的业务线程数 +1
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
long startEstimate = estimate;
// 阻塞(挂起)estimate 这么长的时间,期间如果被唤醒,则 estimate 就会被刷新成剩余等待时间
estimate = notEmpty.awaitNanos(estimate); // signal by
// recycle or
// creator
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
// 期间线程被中断,则唤起一次其他处于阻塞状态的业务线程
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}
// 依然没有竞争到
if (poolingCount == 0) {
// 如果目标阻塞时间(maxWait)还没有用完,则继续尝试获取
if (estimate > 0) {
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
// poolingCount--
decrementPoolingCount();
// 直接获取
DruidConnectionHolder last = connections[poolingCount];
// 获取后意味着连接已被借出,原有位置置空
connections[poolingCount] = null;
// 标记这次获取连接花了多长时间,连接够用时便为 0
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
// 返回
return last;
}
}