MyBatis源码之数据源模块
MyBatis 自身提供了相应的数据源实现,当然 MyBatis 也提供了与第三方数据源集成的接口,这些功能都位于数据源模块之中
1. DataSourceFactory
javax.sql.DataSource 工厂接口, 所在包: org.apache.ibatis.datasource.DataSourceFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface DataSourceFactory { void setProperties (Properties props) ; DataSource getDataSource () ; }
1.1 UnpooledDataSourceFactory
实现 DataSourceFactory 接口,非池化的 DataSourceFactory 实现类, 非池化即不使用连接池技术管理数据库连接的数据源, 无连接复用机制, 每次连接都需要通过数据库驱动新建一个数据库连接, 简单性能不高, 所在包: org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory
1.1.1 构造方法 1 2 3 4 5 6 7 8 9 10 11 12 private static final String DRIVER_PROPERTY_PREFIX = "driver." ;private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();protected DataSource dataSource;public UnpooledDataSourceFactory () { this .dataSource = new UnpooledDataSource (); }
1.1.2 getDataSource方法 1 2 3 4 @Override public DataSource getDataSource () { return dataSource; }
1.1.3 setProperties方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override public void setProperties (Properties properties) { Properties driverProperties = new Properties (); MetaObject metaDataSource = SystemMetaObject.forObject(dataSource); for (Object key : properties.keySet()) { String propertyName = (String) key; if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) { String value = properties.getProperty(propertyName); driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value); } else if (metaDataSource.hasSetter(propertyName)) { String value = (String) properties.get(propertyName); Object convertedValue = convertValue(metaDataSource, propertyName, value); metaDataSource.setValue(propertyName, convertedValue); } else { throw new DataSourceException ("Unknown DataSource property: " + propertyName); } } if (driverProperties.size() > 0 ) { metaDataSource.setValue("driverProperties" , driverProperties); } }private Object convertValue (MetaObject metaDataSource, String propertyName, String value) { Object convertedValue = value; Class<?> targetType = metaDataSource.getSetterType(propertyName); if (targetType == Integer.class || targetType == int .class) { convertedValue = Integer.valueOf(value); } else if (targetType == Long.class || targetType == long .class) { convertedValue = Long.valueOf(value); } else if (targetType == Boolean.class || targetType == boolean .class) { convertedValue = Boolean.valueOf(value); } return convertedValue; }
1.2 PooledDataSourceFactory
继承 UnpooledDataSourceFactory 类,池化的 DataSourceFactory 实现类, 所在包: org.apache.ibatis.datasource.pooled.PooledDataSourceFactory
1 2 3 4 5 6 7 8 public class PooledDataSourceFactory extends UnpooledDataSourceFactory { public PooledDataSourceFactory () { this .dataSource = new PooledDataSource (); } }
1.3 JndiDataSourceFactory
实现 DataSourceFactory 接口,基于 JNDI 的 DataSourceFactory 实现类, 这个数据源的实现是为了能在如 EJB 或应用服务器这类容器中使用,容器可以集中或在外部配置数据源,然后放置一个 JNDI 上下文的引用, 所在包: org.apache.ibatis.datasource.jndi.JndiDataSourceFactory
1.3.1 构造方法 1 2 private DataSource dataSource;
1.3.2 getDataSource 1 2 3 4 @Override public DataSource getDataSource () { return dataSource; }
1.3.3 setProperties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static final String INITIAL_CONTEXT = "initial_context" ;public static final String DATA_SOURCE = "data_source" ;public static final String ENV_PREFIX = "env." ;@Override public void setProperties (Properties properties) { try { InitialContext initCtx; Properties env = getEnvProperties(properties); if (env == null ) { initCtx = new InitialContext (); } else { initCtx = new InitialContext (env); } if (properties.containsKey(INITIAL_CONTEXT) && properties.containsKey(DATA_SOURCE)) { Context ctx = (Context) initCtx.lookup(properties.getProperty(INITIAL_CONTEXT)); dataSource = (DataSource) ctx.lookup(properties.getProperty(DATA_SOURCE)); } else if (properties.containsKey(DATA_SOURCE)) { dataSource = (DataSource) initCtx.lookup(properties.getProperty(DATA_SOURCE)); } } catch (NamingException e) { throw new DataSourceException ("There was an error configuring JndiDataSourceTransactionPool. Cause: " + e, e); } }
2. DataSource
该接口可衍生数据连接池、分库分表、读写分离等功能
2.1 UnpooledDataSource 2.1.1 构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 private ClassLoader driverClassLoader;private Properties driverProperties;private static final Map<String, Driver> registeredDrivers = new ConcurrentHashMap <>();private String driver;private String url;private String username;private String password;private Boolean autoCommit;private Integer defaultTransactionIsolationLevel;private Integer defaultNetworkTimeout;static { Enumeration<Driver> drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver driver = drivers.nextElement(); registeredDrivers.put(driver.getClass().getName(), driver); } }public UnpooledDataSource () { }public UnpooledDataSource (String driver, String url, String username, String password) { this .driver = driver; this .url = url; this .username = username; this .password = password; }public UnpooledDataSource (String driver, String url, Properties driverProperties) { this .driver = driver; this .url = url; this .driverProperties = driverProperties; }public UnpooledDataSource (ClassLoader driverClassLoader, String driver, String url, String username, String password) { this .driverClassLoader = driverClassLoader; this .driver = driver; this .url = url; this .username = username; this .password = password; }public UnpooledDataSource (ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) { this .driverClassLoader = driverClassLoader; this .driver = driver; this .url = url; this .driverProperties = driverProperties; }
2.1.2 getConnection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public Connection getConnection () throws SQLException { return doGetConnection(username, password); }@Override public Connection getConnection (String username, String password) throws SQLException { return doGetConnection(username, password); }private Connection doGetConnection (String username, String password) throws SQLException { Properties props = new Properties (); if (driverProperties != null ) { props.putAll(driverProperties); } if (username != null ) { props.setProperty("user" , username); } if (password != null ) { props.setProperty("password" , password); } return doGetConnection(props); }private Connection doGetConnection (Properties properties) throws SQLException { initializeDriver(); Connection connection = DriverManager.getConnection(url, properties); configureConnection(connection); return connection; }
2.1.3 initializeDriver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void initializeDriver () throws SQLException { try { MapUtil.computeIfAbsent(registeredDrivers, driver, x -> { Class<?> driverType; try { if (driverClassLoader != null ) { driverType = Class.forName(x, true , driverClassLoader); } else { driverType = Resources.classForName(x); } Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance(); DriverManager.registerDriver(new DriverProxy (driverInstance)); return driverInstance; } catch (Exception e) { throw new RuntimeException ("Error setting driver on UnpooledDataSource." , e); } }); } catch (RuntimeException re) { throw new SQLException ("Error setting driver on UnpooledDataSource." , re.getCause()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void configureConnection (Connection conn) throws SQLException { if (defaultNetworkTimeout != null ) { conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout); } if (autoCommit != null && autoCommit != conn.getAutoCommit()) { conn.setAutoCommit(autoCommit); } if (defaultTransactionIsolationLevel != null ) { conn.setTransactionIsolation(defaultTransactionIsolationLevel); } }
2.2 PooledDataSource
实现 DataSource 接口,池化的 DataSource 实现类, 所在包: org.apache.ibatis.datasource.pooled.PooledDataSource
2.2.1 构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private static final Log log = LogFactory.getLog(PooledDataSource.class);private final PoolState state = new PoolState (this );private final UnpooledDataSource dataSource;protected int poolMaximumActiveConnections = 10 ;protected int poolMaximumIdleConnections = 5 ;protected int poolMaximumCheckoutTime = 20000 ;protected int poolTimeToWait = 20000 ;protected int poolMaximumLocalBadConnectionTolerance = 3 ;protected String poolPingQuery = "NO PING QUERY SET" ;protected boolean poolPingEnabled;protected int poolPingConnectionsNotUsedFor;private int expectedConnectionTypeCode;private final Lock lock = new ReentrantLock ();private final Condition condition = lock.newCondition();public PooledDataSource () { dataSource = new UnpooledDataSource (); }public PooledDataSource (UnpooledDataSource dataSource) { this .dataSource = dataSource; }public PooledDataSource (String driver, String url, String username, String password) { dataSource = new UnpooledDataSource (driver, url, username, password); expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); }public PooledDataSource (String driver, String url, Properties driverProperties) { dataSource = new UnpooledDataSource (driver, url, driverProperties); expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); }public PooledDataSource (ClassLoader driverClassLoader, String driver, String url, String username, String password) { dataSource = new UnpooledDataSource (driverClassLoader, driver, url, username, password); expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); }public PooledDataSource (ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) { dataSource = new UnpooledDataSource (driverClassLoader, driver, url, driverProperties); expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); }
2.2.2 getConnection 1 2 3 4 5 6 7 8 9 10 11 @Override public Connection getConnection () throws SQLException { return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection(); }@Override public Connection getConnection (String username, String password) throws SQLException { return popConnection(username, password).getProxyConnection(); }
2.2.3 popConnection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 private PooledConnection popConnection (String username, String password) throws SQLException { boolean countedWait = false ; PooledConnection conn = null ; long t = System.currentTimeMillis(); int localBadConnectionCount = 0 ; while (conn == null ) { lock.lock(); try { if (!state.idleConnections.isEmpty()) { conn = state.idleConnections.remove(0 ); if (log.isDebugEnabled()) { log.debug("Checked out connection " + conn.getRealHashCode() + " from pool." ); } } else if (state.activeConnections.size() < poolMaximumActiveConnections) { conn = new PooledConnection (dataSource.getConnection(), this ); if (log.isDebugEnabled()) { log.debug("Created connection " + conn.getRealHashCode() + "." ); } } else { PooledConnection oldestActiveConnection = state.activeConnections.get(0 ); long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); if (longestCheckoutTime > poolMaximumCheckoutTime) { state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; state.activeConnections.remove(oldestActiveConnection); if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { try { oldestActiveConnection.getRealConnection().rollback(); } catch (SQLException e) { log.debug("Bad connection. Could not roll back" ); } } conn = new PooledConnection (oldestActiveConnection.getRealConnection(), this ); conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp()); oldestActiveConnection.invalidate(); if (log.isDebugEnabled()) { log.debug("Claimed overdue connection " + conn.getRealHashCode() + "." ); } } else { try { if (!countedWait) { state.hadToWaitCount++; countedWait = true ; } if (log.isDebugEnabled()) { log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection." ); } long wt = System.currentTimeMillis(); if (!condition.await(poolTimeToWait, TimeUnit.MILLISECONDS)) { log.debug("Wait failed..." ); } state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } } if (conn != null ) { if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection." ); } state.badConnectionCount++; localBadConnectionCount++; conn = null ; if (localBadConnectionCount > poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Could not get a good connection to the database." ); } throw new SQLException ("PooledDataSource: Could not get a good connection to the database." ); } } } } finally { lock.unlock(); } } if (conn == null ) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection." ); } throw new SQLException ( "PooledDataSource: Unknown severe error condition. The connection pool returned a null connection." ); } return conn; }
2.2.4 pushConnection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 protected void pushConnection (PooledConnection conn) throws SQLException { lock.lock(); try { state.activeConnections.remove(conn); if (conn.isValid()) { if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } PooledConnection newConn = new PooledConnection (conn.getRealConnection(), this ); state.idleConnections.add(newConn); newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); conn.invalidate(); if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool." ); } condition.signal(); } else { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.getRealConnection().close(); if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "." ); } conn.invalidate(); } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection." ); } state.badConnectionCount++; } } finally { lock.unlock(); } }
2.2.5 pingConnection
过向数据库发起 poolPingQuery
语句来发起“ping”操作,以判断数据库连接是否有效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 protected boolean pingConnection (PooledConnection conn) { boolean result; try { result = !conn.getRealConnection().isClosed(); } catch (SQLException e) { if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage()); } result = false ; } if (result && poolPingEnabled && poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) { try { if (log.isDebugEnabled()) { log.debug("Testing connection " + conn.getRealHashCode() + " ..." ); } Connection realConn = conn.getRealConnection(); try (Statement statement = realConn.createStatement()) { statement.executeQuery(poolPingQuery).close(); } if (!realConn.getAutoCommit()) { realConn.rollback(); } if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is GOOD!" ); } } catch (Exception e) { log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage()); try { conn.getRealConnection().close(); } catch (Exception e2) { } result = false ; if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage()); } } } return result; }
2.2.6 forceCloseAll
关闭所有的 activeConnections
和 idleConnections
的连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public void forceCloseAll () { lock.lock(); try { expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); for (int i = state.activeConnections.size(); i > 0 ; i--) { try { PooledConnection conn = state.activeConnections.remove(i - 1 ); conn.invalidate(); Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); } realConn.close(); } catch (Exception e) { } } for (int i = state.idleConnections.size(); i > 0 ; i--) { try { PooledConnection conn = state.idleConnections.remove(i - 1 ); conn.invalidate(); Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); } realConn.close(); } catch (Exception e) { } } } finally { lock.unlock(); } if (log.isDebugEnabled()) { log.debug("PooledDataSource forcefully closed/removed all connections." ); } }
2.2.7 unwrapConnection
获取真实的数据库连接
1 2 3 4 5 6 7 8 9 10 11 12 public static Connection unwrapConnection (Connection conn) { if (Proxy.isProxyClass(conn.getClass())) { InvocationHandler handler = Proxy.getInvocationHandler(conn); if (handler instanceof PooledConnection) { return ((PooledConnection) handler).getRealConnection(); } } return conn; }
3. PoolState
连接池状态,记录空闲和激活的 PooledConnection 集合,以及相关的数据统计, 所在包: org.apache.ibatis.datasource.pooled.PoolState
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private final ReentrantLock lock = new ReentrantLock ();protected PooledDataSource dataSource;protected final List<PooledConnection> idleConnections = new ArrayList <>();protected final List<PooledConnection> activeConnections = new ArrayList <>();protected long requestCount;protected long accumulatedRequestTime;protected long accumulatedCheckoutTime;protected long claimedOverdueConnectionCount;protected long accumulatedCheckoutTimeOfOverdueConnections;protected long accumulatedWaitTime;protected long hadToWaitCount;protected long badConnectionCount;
4. PooledConnection
实现 InvocationHandler 接口,池化的 Connection 对象, 所在包: org.apache.ibatis.datasource.pooled.PooledConnection
4.1 构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 private static final String CLOSE = "close" ;private static final Class<?>[] IFACES = { Connection.class };private final int hashCode;private final PooledDataSource dataSource;private final Connection realConnection;private final Connection proxyConnection;private long checkoutTimestamp;private long createdTimestamp;private long lastUsedTimestamp;private int connectionTypeCode;private boolean valid;public PooledConnection (Connection connection, PooledDataSource dataSource) { this .hashCode = connection.hashCode(); this .realConnection = connection; this .dataSource = dataSource; this .createdTimestamp = System.currentTimeMillis(); this .lastUsedTimestamp = System.currentTimeMillis(); this .valid = true ; this .proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this ); }
4.2 invoke 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (CLOSE.equals(methodName)) { dataSource.pushConnection(this ); return null ; } try { if (!Object.class.equals(method.getDeclaringClass())) { checkConnection(); } return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } }