注意事項 2016/07/20追加

本記事はpgpool-Ⅱのバージョン3.2.9で検証したものである。
pgpool-Ⅱのロードバランスの条件についてのドキュメントが、バージョン3.4.3から大きく変わっている。
replicationモードにおいて、v3.2.9では、BEGINコマンド発行後はselectしか実行しなくてもロードバランスしてくれなかった。
v3.4.3以降では、BEGINコマンド発行後でもmaster slaveモードと同じように、更新系クエリが実行されるまでロードバランスしてくれるそうだ。

未検証ではあるが、注意事項として追記する。
また最新版を常に確認するのが良い。http://www.pgpool.net/docs/latest/pgpool-ja.html

v3.2.9のドキュメントの差異抜粋
http://www.pgpool.net/docs/pgpool-II-3.2.9/doc/pgpool-ja.html

ロードバランスの条件について

load_balance_mode = true を設定した場合、以下の条件のすべてを満たした時に SELECTなどの問い合わせがロードバランスされます。

  • 問い合わせが明示的なトランクザションブロックの内側にない(つまり、BEGINを発行していない)
  • マスタースレーブモードの場合、更に以下の条件が満たされなければなりません。V3.0 -

    • 一時テーブルを使っていない
    • unloggedテーブルを使っていない
    • システムカタログを使っていない
    • トランザクションブロックの内側であっても上記条件と下記条件を満たせばロードバランスされます。

      • トランザクション分離レベルがSERIALIZABLEでない
      • トランザクション内で更新を伴うクエリが実行されていない(更新を伴うクエリが実行されるまではロードバランスされます)

注意: JDBC ドライバなどのように、ドライバ内で autocommit の有効・無効のオプションがある場合、 autocommit を無効にすると、ドライバが内部で BEGIN コマンドを実行する関係上、 正しくロードバランスされない可能性があります。 クエリをロードバランスさせたい場合は autocommit を有効にしてください。 たとえばJDBCであれば setAutoCommit(true) を実行してください。

v3.4.3のドキュメントの差異抜粋
http://www.pgpool.net/docs/pgpool-II-3.4.3/doc/pgpool-ja.html

ロードバランスの条件について

  • 問い合わせが明示的なトランクザションブロックの内側にない(つまり、BEGINを発行していない)

    • ただし、以下の条件が満たされればトランザクションブロックの内側であってもロードバランスの対象となります。

      • トランザクション分離レベルがSERIALIZABLEでない
      • トランザクション内で更新を伴うクエリが実行されていない
        (更新を伴うクエリが実行されるまではロードバランスされます。 ここで「更新を伴うクエリ」とは、SELECT以外のDDLやDMLを指します。 black/white function listで指定される更新関数を含むSELECTは更新を伴うクエリとは見なされません。 この仕様は将来変更される可能性があります)

      • もしblack/white function listが空の場合は、関数を持つSELECTは、更新を伴うクエリとは見なされません。

  • マスタースレーブモードの場合、更に以下の条件が満たされなければなりません。V3.0 -

    • 一時テーブルを使っていない
    • unloggedテーブルを使っていない
    • システムカタログを使っていない

注意: JDBC ドライバなどのように、ドライバ内で autocommit の有効・無効のオプションがある場合、 autocommit を無効にすると、ドライバが内部で BEGIN コマンドを実行し、明示的なトランザクションが開始されます。 この場合、トランザクション内における上記のロードバランスの制限事項が適用されます。

本文

トランザクションが開始されるまでSELECTをload balanceしてくれるpgpool-Ⅱのreplicationモードを使用するため、更新系SQLが発行されてはじめてトランザクションが発行されるようにした。

Actionに@TransactionAttribute(TransactionAttributeType.NEVER)を、Serviceに@TransactionAttribute(TransactionAttributeType.SUPPORTS)を付与し、トランザクションが開始されないようにして、S2JDBCのJdbcManagerImplで、更新系SQLを発行するときにトランザクションが開始されていなければ開始するようにした。
また、ActionInterceptorでaction終了時にトランザクションを終了した。

ちなみにselect for updateもトランザクション制御が必要なので、select for updateを示すAnnotationで対処した。
試していないが、AnnotationよりもJdbcManagerImpl_のメソッドが返すAutoSelect、SqlSelect、SqlFileSelectを拡張して、getResultList()getSingleResult()内で、実行されるSQLが格納されるインスタンス変数executedSqlを調べることでfor updateか否かを判断した方が良さそう。。。

JdbcManagerImpl_.java

public class JdbcManagerImpl_ extends JdbcManagerImpl implements JdbcManager, JdbcManagerImplementor {

    @Resource
    protected UserTransaction userTransaction;

    public JdbcManagerImpl_() {
    }

    /**
     * トランザクションを開始する
     */
    protected void beginTransaction() {
        try {
            if (!hasTransaction()) {
                userTransaction.begin();
            }
        } catch (SystemException | NotSupportedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * SelectForUpdateのAnnotationがDaoのメソッドについている場合、トランザクションを開始する
     */
    protected void beginTransactionForSelectForUpdate() {
        if (hasTransaction()) {
            return;
        }
        StackTraceElement[] stes = Thread.currentThread().getStackTrace();
        for (StackTraceElement ste : stes) {
            String callerClsName = ste.getClassName();
            if (!callerClsName.endsWith("Dao")) {
                continue;
            }

            Class<?> callerCls;
            try {
                callerCls = Class.forName(callerClsName);
            } catch (ClassNotFoundException e) {
                throw new InternalError();
            }
            String callerMethodName = ste.getMethodName();
            Method[] methods = callerCls.getDeclaredMethods();
            for (Method method : methods) {
                if (method.getName().equals(callerMethodName)) {
                    Annotation[] annotations = method.getAnnotations();
                    for (Annotation annotation : annotations) {
                        if (annotation.annotationType().equals(SelectForUpdate.class)) {
                            beginTransaction();
                            return;
                        }
                    }
                }
            }
        }
    }

    @Override
    public <T> AutoDelete<T> delete(final T entity) {
        beginTransaction();
        return super.delete(entity);
    }

    @Override
    public SqlFileUpdate updateBySqlFile(String path, Object parameter) {
        beginTransaction();
        return super.updateBySqlFile(path, parameter);
    }

    @Override
    public SqlUpdate updateBySql(String sql, Class<?>... paramClasses) {
        beginTransaction();
        return super.updateBySql(sql, paramClasses);
    }

    @Override
    public <T> AutoSelect<T> from(Class<T> baseClass) {
        beginTransactionForSelectForUpdate();
        return super.from(baseClass);
    }

    @Override
    public <T> SqlSelect<T> selectBySql(Class<T> baseClass, String sql,
            Object... params) {
        beginTransactionForSelectForUpdate();
        return super.selectBySql(baseClass, sql, params);
    }
    
    ....(省略)....

    @Override
    public DbmsDialect getDialect() {
        return super.getDialect();
    }

    @Override
    public void setDialect(DbmsDialect dialect) {
        super.setDialect(dialect);
    }

    @Override
    public EntityMetaFactory getEntityMetaFactory() {
        return super.getEntityMetaFactory();
    }

    @Override
    public void setEntityMetaFactory(EntityMetaFactory entityMetaFactory) {
        super.setEntityMetaFactory(entityMetaFactory);
    }

    @Override
    public PersistenceConvention getPersistenceConvention() {
        return super.getPersistenceConvention();
    }

    @Override
    public void setPersistenceConvention(
            PersistenceConvention persistenceConvention) {
        super.setPersistenceConvention(persistenceConvention);
    }
}

ActionInterceptor.java

    @Resource
    protected UserTransaction userTransaction;

    /**
     * invoke
     * @param methodInvocation MethodInvocation
     * @return Object
     * @throws Throwable Throwable
     */
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {

        // 戻り値を宣言
        Object result = null;
        try {
            result = methodInvocation.proceed();
        } catch (Exception ex) {
            return result;
        } finally {
            try {
                int status = userTransaction.getStatus();
                if (status == Status.STATUS_ACTIVE) {
                    userTransaction.commit();
                } else if (status == Status.STATUS_MARKED_ROLLBACK || status == Status.STATUS_ROLLEDBACK) {
                    userTransaction.rollback();
                }
            } catch (IllegalStateException | SecurityException | SystemException e) {
                throw new RuntimeException(e);
            }
        }
        return result;
    }