文章

Kotlin 协程实现 Vertx 事务管理简单封装

参考项目链接:https://github.com/AiKrai001/vertx-pj

1. 背景介绍

Vertx 库的 SQL 操作较为复杂,事务管理也比较麻烦。本教程将介绍如何使用 Kotlin 协程上下文来简单封装一套简洁的事务管理方案。

2. 基础框架封装

2.1 定义基础接口

首先参考 ebean 等框架的设计,定义一个基础的 Repository 接口,这里示例就写一个execute方法,可自行实现crud等更多方法:

interface Repository<TId, TEntity> {  
  suspend fun <R> execute(sql: String): R
}

2.2 实现基础接口

创建 Repository 接口的实现类:

import com.fasterxml.jackson.core.type.TypeReference  
import io.vertx.kotlin.coroutines.coAwait  
import io.vertx.sqlclient.*  
import io.vertx.sqlclient.templates.SqlTemplate  
import mu.KotlinLogging  
import org.aikrai.vertx.jackson.JsonUtil  
import java.lang.reflect.ParameterizedType  
  
open class RepositoryImpl<TId, TEntity : Any>(  
  private val sqlClient: SqlClient  
) : Repository<TId, TEntity> {  
  private val clazz: Class<TEntity> = (this::class.java.genericSuperclass as ParameterizedType)  
    .actualTypeArguments[1] as Class<TEntity>  
  
  override suspend fun <R> execute(sql: String): R {  
    return if (sql.trimStart().startsWith("SELECT", true)) {  
      val list = SqlTemplate.forQuery(getConnection(), sql).execute(mapOf())  
        .coAwait().map { it.toJson() }  
      val jsonObject = JsonUtil.toJsonObject(list)  
      val typeReference = object : TypeReference<R>() {}  
      JsonUtil.parseObject(jsonObject, typeReference, true)  
    } else {  
      val rowCount = SqlTemplate.forUpdate(getConnection(), sql).execute(mapOf())  
        .coAwait().rowCount()  
      rowCount as R  
    }  
  }  
}

说明:

  • TId: 映射对象主键的类型
  • TEntity: 映射对象类型
  • sqlClient: 用于执行 SQL 的客户端实例
  • execute: 支持执行自定义 SQL 并返回结果

3. 实际应用示例

3.1 创建用户表相关代码

使用 Google Guice 进行依赖注入:

@ImplementedBy(UserRepositoryImpl::class)  
interface UserRepository : Repository<Long, User> {  
}

class UserRepositoryImpl @Inject constructor(  
  sqlClient: SqlClient  
) : RepositoryImpl<Long, User>(sqlClient), UserRepository {  
}

3.2 配置依赖注入

object InjectConfig {  
  fun configure(vertx: Vertx, configMap: Map<String, Any>): Injector {  
    return Guice.createInjector(InjectorModule(vertx, configMap))  
  }  
}

class InjectorModule(
  private val vertx: Vertx,  
  private val configMap: Map<String, Any>  
) : AbstractModule() {
  override fun configure() {  
    val pool = getDbPool().also { initTxMgr(it) }   
    bind(Pool::class.java).toInstance(pool)  
    bind(SqlClient::class.java).toInstance(pool)  
  }  
  
  private fun getDbPool(): Pool {  
    val url = configMap["databases.url"].toString()  
    val poolOptions = PoolOptions().setMaxSize(10)  
    val pool = MySQLBuilder.pool().connectingTo(url).with(poolOptions).using(vertx).build()  
    return pool  
  }  
}

3.3 说明

基础代码就编写完成了。下面开始编写事务管理相关代码。

想过三种方法实现:使用自定义注解配合Google Guice注册拦截器实现aop,类似spring事务的实现方式,但是Google Guice不支持携程;协程上下文CoroutineContext;使用类似装饰器方式实现。

4. 事务管理实现

4.1 事务上下文元素

class TxCtxElem(  
  val sqlConnection: SqlConnection,  
  val transaction: Transaction,  
  val isActive: Boolean = true,  
  val isNested: Boolean = false,  
  val transactionStack: Stack<TxCtxElem>,  
  val index: Int = transactionStack.size,  
  val transactionId: String = UUID.randomUUID().toString()  
) : CoroutineContext.Element {  
  companion object Key : CoroutineContext.Key<TxCtxElem>  
  override val key: CoroutineContext.Key<*> = Key  
}

4.2 事务管理器

核心实现类 TxMgr

import io.vertx.kotlin.coroutines.coAwait  
import io.vertx.sqlclient.Pool  
import io.vertx.sqlclient.SqlConnection  
import io.vertx.sqlclient.Transaction  
import kotlinx.coroutines.CoroutineScope  
import kotlinx.coroutines.withContext  
import mu.KotlinLogging  
import org.aikrai.vertx.utlis.Meta  
import java.util.*  
import java.util.concurrent.ConcurrentHashMap  
import java.util.concurrent.atomic.AtomicReference  
import kotlin.coroutines.CoroutineContext  
import kotlin.coroutines.coroutineContext  
  
suspend fun <T> withTransaction(block: suspend CoroutineScope.() -> T): Any? {  
  return TxMgrHolder.txMgr.withTransaction(block)  
}  
  
object TxMgrHolder {  
  private val _txMgr = AtomicReference<TxMgr?>(null)  
  
  val txMgr: TxMgr  
    get() = _txMgr.get() ?: throw Meta.failure(  
      "TransactionError",  
      "TxMgr(TransactionManager) 尚未初始化。请先调用 initTxMgr()。"  
    )  
    
  fun initTxMgr(pool: Pool) {  
    if (_txMgr.get() != null) return  
    val newManager = TxMgr(pool)  
    _txMgr.compareAndSet(null, newManager)  
  }  
}  
  
class TxMgr(  
  private val pool: Pool  
) {  
  private val logger = KotlinLogging.logger { }  
  private val transactionStackMap = ConcurrentHashMap<CoroutineContext, Stack<TxCtxElem>>()  
  
  suspend fun <T> withTransaction(block: suspend CoroutineScope.() -> T): Any? {  
    val currentContext = coroutineContext  
    val transactionStack = currentContext[TxCtxElem]?.transactionStack ?: Stack<TxCtxElem>()  
    // 外层事务,嵌套事务,都创建新的连接和事务。实现外层事务回滚时所有嵌套事务回滚,嵌套事务回滚不影响外部事务  
    val connection: SqlConnection = pool.connection.coAwait()  
    val transaction: Transaction = connection.begin().coAwait()  
  
    return try {  
      val txCtxElem =  
        TxCtxElem(connection, transaction, true, transactionStack.isNotEmpty(), transactionStack)  
      transactionStack.push(txCtxElem)  
      logger.debug { (if (txCtxElem.isNested) "嵌套" else "") + "事务Id:" + txCtxElem.transactionId + "开始" }  
  
      withContext(currentContext + txCtxElem) {  
        val result = block()  
        if (txCtxElem.index == 0) {  
          while (transactionStack.isNotEmpty()) {  
            val txCtx = transactionStack.pop()  
            txCtx.transaction.commit().coAwait()  
            logger.debug { (if (txCtx.isNested) "嵌套" else "") + "事务Id:" + txCtx.transactionId + "提交" }  
          }  
        }  
        result  
      }  
    } catch (e: Exception) {  
      logger.error(e) { "Transaction failed, rollback" }  
      if (transactionStack.isNotEmpty() && !transactionStack.peek().isNested) {  
        // 外层事务失败,回滚所有事务  
        logger.error { "Rolling back all transactions" }  
        while (transactionStack.isNotEmpty()) {  
          val txCtxElem = transactionStack.pop()  
          txCtxElem.transaction.rollback().coAwait()  
          logger.debug { (if (txCtxElem.isNested) "嵌套" else "") + "事务Id:" + txCtxElem.transactionId + "回滚" }  
        }  
        throw e  
      } else {  
        // 嵌套事务失败,只回滚当前事务  
        val txCtxElem = transactionStack.pop()  
        txCtxElem.transaction.rollback().coAwait()  
        logger.debug(e) { (if (txCtxElem.isNested) "嵌套" else "") + "事务Id:" + txCtxElem.transactionId + "回滚" }  
      }  
    } finally {  
      if (transactionStack.isEmpty()) {  
        transactionStackMap.remove(currentContext) // 清理上下文  
        connection.close() // 仅在外层事务时关闭连接  
      }  
    }  
  }  
}

实现说明:

  1. 使用协程上下文管理事务状态
  2. 支持事务嵌套,采用独立连接策略
  3. 实现事务提交和回滚的完整生命周期管理

4.3 修改 Repository 实现

open class RepositoryImpl<TId, TEntity : Any>(  
  private val sqlClient: SqlClient  
) : Repository<TId, TEntity> {
  private suspend fun getConnection(): SqlClient {  
    return if (TxCtx.isTransactionActive(coroutineContext)) {  
      TxCtx.currentSqlConnection(coroutineContext) ?: sqlClient
    } else {  
      sqlClient  
    }  
  }
  // ... 其他实现代码
}

5. 使用方式

suspend fun testTransaction() {  
  withTransaction {  
    val execute1 = userRepository.execute<Int>(
      "update sys_user set email = '88888' where user_name = '运若汐'"
    )  
    println("运若汐: $execute1")  
    throw Meta.failure("test transaction", "test transaction")  
  }  
}

6. 注意事项

  1. withTransaction中包裹的方法必须使用 RepositoryImpl 中使用 getConnection() 获取连接执行sql的方法。
  2. 不能在 withTransaction 中开启新的协程上下文
  3. 事务尽量不要嵌套,目前嵌套处理逻辑是外层事务回滚会导致所有嵌套事务回滚,嵌套事务回滚不影响外层事务,可以自行修改。
  4. 事务尽量不要嵌套,可能导致性能问题。
  5. 同级事务会被视为嵌套事务处理。

若有错误多多包涵,可左下角邮箱联系。

License:  CC BY 4.0