Open Source Repository

Home /spring/spring-jms-3.0.5 | Repository Home



org/springframework/jms/connection/ConnectionFactoryUtils.java
/*
 * Copyright 2002-2009 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.jms.connection;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.transaction.support.ResourceHolderSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/**
 * Helper class for managing a JMS {@link javax.jms.ConnectionFactory}, in particular
 * for obtaining transactional JMS resources for a given ConnectionFactory.
 *
 <p>Mainly for internal use within the framework. Used by
 {@link org.springframework.jms.core.JmsTemplate} as well as
 {@link org.springframework.jms.listener.DefaultMessageListenerContainer}.
 *
 @author Juergen Hoeller
 @since 2.0
 @see SmartConnectionFactory
 */
public abstract class ConnectionFactoryUtils {

  private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class);


  /**
   * Release the given Connection, stopping it (if necessary) and eventually closing it.
   <p>Checks {@link SmartConnectionFactory#shouldStop}, if available.
   * This is essentially a more sophisticated version of
   {@link org.springframework.jms.support.JmsUtils#closeConnection}.
   @param con the Connection to release
   * (if this is <code>null</code>, the call will be ignored)
   @param cf the ConnectionFactory that the Connection was obtained from
   * (may be <code>null</code>)
   @param started whether the Connection might have been started by the application
   @see SmartConnectionFactory#shouldStop
   @see org.springframework.jms.support.JmsUtils#closeConnection
   */
  public static void releaseConnection(Connection con, ConnectionFactory cf, boolean started) {
    if (con == null) {
      return;
    }
    if (started && cf instanceof SmartConnectionFactory && ((SmartConnectionFactorycf).shouldStop(con)) {
      try {
        con.stop();
      }
      catch (Throwable ex) {
        logger.debug("Could not stop JMS Connection before closing it", ex);
      }
    }
    try {
      con.close();
    }
    catch (Throwable ex) {
      logger.debug("Could not close JMS Connection", ex);
    }
  }

  /**
   * Return the innermost target Session of the given Session. If the given
   * Session is a proxy, it will be unwrapped until a non-proxy Session is
   * found. Otherwise, the passed-in Session will be returned as-is.
   @param session the Session proxy to unwrap
   @return the innermost target Session, or the passed-in one if no proxy
   @see SessionProxy#getTargetSession()
   */
  public static Session getTargetSession(Session session) {
    Session sessionToUse = session;
    while (sessionToUse instanceof SessionProxy) {
      sessionToUse = ((SessionProxysessionToUse).getTargetSession();
    }
    return sessionToUse;
  }



  /**
   * Determine whether the given JMS Session is transactional, that is,
   * bound to the current thread by Spring's transaction facilities.
   @param session the JMS Session to check
   @param cf the JMS ConnectionFactory that the Session originated from
   @return whether the Session is transactional
   */
  public static boolean isSessionTransactional(Session session, ConnectionFactory cf) {
    if (session == null || cf == null) {
      return false;
    }
    JmsResourceHolder resourceHolder = (JmsResourceHolderTransactionSynchronizationManager.getResource(cf);
    return (resourceHolder != null && resourceHolder.containsSession(session));
  }


  /**
   * Obtain a JMS Session that is synchronized with the current transaction, if any.
   @param cf the ConnectionFactory to obtain a Session for
   @param existingCon the existing JMS Connection to obtain a Session for
   * (may be <code>null</code>)
   @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
   * that is synchronized with a Spring-managed transaction (where the main transaction
   * might be a JDBC-based one for a specific DataSource, for example), with the JMS
   * transaction committing right after the main transaction. If not allowed, the given
   * ConnectionFactory needs to handle transaction enlistment underneath the covers.
   @return the transactional Session, or <code>null</code> if none found
   @throws JMSException in case of JMS failure
   */
  public static Session getTransactionalSession(
      final ConnectionFactory cf, final Connection existingCon, final boolean synchedLocalTransactionAllowed)
      throws JMSException {

    return doGetTransactionalSession(cf, new ResourceFactory() {
      public Session getSession(JmsResourceHolder holder) {
        return holder.getSession(Session.class, existingCon);
      }
      public Connection getConnection(JmsResourceHolder holder) {
        return (existingCon != null ? existingCon : holder.getConnection());
      }
      public Connection createConnection() throws JMSException {
        return cf.createConnection();
      }
      public Session createSession(Connection conthrows JMSException {
        return con.createSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
      }
      public boolean isSynchedLocalTransactionAllowed() {
        return synchedLocalTransactionAllowed;
      }
    }true);
  }

  /**
   * Obtain a JMS QueueSession that is synchronized with the current transaction, if any.
   <p>Mainly intended for use with the JMS 1.0.2 API.
   @param cf the ConnectionFactory to obtain a Session for
   @param existingCon the existing JMS Connection to obtain a Session for
   * (may be <code>null</code>)
   @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
   * that is synchronized with a Spring-managed transaction (where the main transaction
   * might be a JDBC-based one for a specific DataSource, for example), with the JMS
   * transaction committing right after the main transaction. If not allowed, the given
   * ConnectionFactory needs to handle transaction enlistment underneath the covers.
   @return the transactional Session, or <code>null</code> if none found
   @throws JMSException in case of JMS failure
   */
  public static QueueSession getTransactionalQueueSession(
      final QueueConnectionFactory cf, final QueueConnection existingCon, final boolean synchedLocalTransactionAllowed)
      throws JMSException {

    return (QueueSessiondoGetTransactionalSession(cf, new ResourceFactory() {
      public Session getSession(JmsResourceHolder holder) {
        return holder.getSession(QueueSession.class, existingCon);
      }
      public Connection getConnection(JmsResourceHolder holder) {
        return (existingCon != null ? existingCon : holder.getConnection(QueueConnection.class));
      }
      public Connection createConnection() throws JMSException {
        return cf.createQueueConnection();
      }
      public Session createSession(Connection conthrows JMSException {
        return ((QueueConnectioncon).createQueueSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
      }
      public boolean isSynchedLocalTransactionAllowed() {
        return synchedLocalTransactionAllowed;
      }
    }true);
  }

  /**
   * Obtain a JMS TopicSession that is synchronized with the current transaction, if any.
   <p>Mainly intended for use with the JMS 1.0.2 API.
   @param cf the ConnectionFactory to obtain a Session for
   @param existingCon the existing JMS Connection to obtain a Session for
   * (may be <code>null</code>)
   @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
   * that is synchronized with a Spring-managed transaction (where the main transaction
   * might be a JDBC-based one for a specific DataSource, for example), with the JMS
   * transaction committing right after the main transaction. If not allowed, the given
   * ConnectionFactory needs to handle transaction enlistment underneath the covers.
   @return the transactional Session, or <code>null</code> if none found
   @throws JMSException in case of JMS failure
   */
  public static TopicSession getTransactionalTopicSession(
      final TopicConnectionFactory cf, final TopicConnection existingCon, final boolean synchedLocalTransactionAllowed)
      throws JMSException {

    return (TopicSessiondoGetTransactionalSession(cf, new ResourceFactory() {
      public Session getSession(JmsResourceHolder holder) {
        return holder.getSession(TopicSession.class, existingCon);
      }
      public Connection getConnection(JmsResourceHolder holder) {
        return (existingCon != null ? existingCon : holder.getConnection(TopicConnection.class));
      }
      public Connection createConnection() throws JMSException {
        return cf.createTopicConnection();
      }
      public Session createSession(Connection conthrows JMSException {
        return ((TopicConnectioncon).createTopicSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
      }
      public boolean isSynchedLocalTransactionAllowed() {
        return synchedLocalTransactionAllowed;
      }
    }true);
  }

  /**
   * Obtain a JMS Session that is synchronized with the current transaction, if any.
   <p>This <code>doGetTransactionalSession</code> variant always starts the underlying
   * JMS Connection, assuming that the Session will be used for receiving messages.
   @param connectionFactory the JMS ConnectionFactory to bind for
   * (used as TransactionSynchronizationManager key)
   @param resourceFactory the ResourceFactory to use for extracting or creating
   * JMS resources
   @return the transactional Session, or <code>null</code> if none found
   @throws JMSException in case of JMS failure
   @see #doGetTransactionalSession(javax.jms.ConnectionFactory, ResourceFactory, boolean)
   */
  public static Session doGetTransactionalSession(
      ConnectionFactory connectionFactory, ResourceFactory resourceFactorythrows JMSException {

    return doGetTransactionalSession(connectionFactory, resourceFactory, true);
  }

  /**
   * Obtain a JMS Session that is synchronized with the current transaction, if any.
   @param connectionFactory the JMS ConnectionFactory to bind for
   * (used as TransactionSynchronizationManager key)
   @param resourceFactory the ResourceFactory to use for extracting or creating
   * JMS resources
   @param startConnection whether the underlying JMS Connection approach should be
   * started in order to allow for receiving messages. Note that a reused Connection
   * may already have been started before, even if this flag is <code>false</code>.
   @return the transactional Session, or <code>null</code> if none found
   @throws JMSException in case of JMS failure
   */
  public static Session doGetTransactionalSession(
      ConnectionFactory connectionFactory, ResourceFactory resourceFactory, boolean startConnection)
      throws JMSException {

    Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
    Assert.notNull(resourceFactory, "ResourceFactory must not be null");

    JmsResourceHolder resourceHolder =
        (JmsResourceHolderTransactionSynchronizationManager.getResource(connectionFactory);
    if (resourceHolder != null) {
      Session session = resourceFactory.getSession(resourceHolder);
      if (session != null) {
        if (startConnection) {
          Connection con = resourceFactory.getConnection(resourceHolder);
          if (con != null) {
            con.start();
          }
        }
        return session;
      }
      if (resourceHolder.isFrozen()) {
        return null;
      }
    }
    if (!TransactionSynchronizationManager.isSynchronizationActive()) {
      return null;
    }
    JmsResourceHolder resourceHolderToUse = resourceHolder;
    if (resourceHolderToUse == null) {
      resourceHolderToUse = new JmsResourceHolder(connectionFactory);
    }
    Connection con = resourceFactory.getConnection(resourceHolderToUse);
    Session session = null;
    try {
      boolean isExistingCon = (con != null);
      if (!isExistingCon) {
        con = resourceFactory.createConnection();
        resourceHolderToUse.addConnection(con);
      }
      session = resourceFactory.createSession(con);
      resourceHolderToUse.addSession(session, con);
      if (startConnection) {
        con.start();
      }
    }
    catch (JMSException ex) {
      if (session != null) {
        try {
          session.close();
        }
        catch (Throwable ex2) {
          // ignore
        }
      }
      if (con != null) {
        try {
          con.close();
        }
        catch (Throwable ex2) {
          // ignore
        }
      }
      throw ex;
    }
    if (resourceHolderToUse != resourceHolder) {
      TransactionSynchronizationManager.registerSynchronization(
          new JmsResourceSynchronization(
              resourceHolderToUse, connectionFactory, resourceFactory.isSynchedLocalTransactionAllowed()));
      resourceHolderToUse.setSynchronizedWithTransaction(true);
      TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse);
    }
    return session;
  }


  /**
   * Callback interface for resource creation.
   * Serving as argument for the <code>doGetTransactionalSession</code> method.
   */
  public interface ResourceFactory {

    /**
     * Fetch an appropriate Session from the given JmsResourceHolder.
     @param holder the JmsResourceHolder
     @return an appropriate Session fetched from the holder,
     * or <code>null</code> if none found
     */
    Session getSession(JmsResourceHolder holder);

    /**
     * Fetch an appropriate Connection from the given JmsResourceHolder.
     @param holder the JmsResourceHolder
     @return an appropriate Connection fetched from the holder,
     * or <code>null</code> if none found
     */
    Connection getConnection(JmsResourceHolder holder);

    /**
     * Create a new JMS Connection for registration with a JmsResourceHolder.
     @return the new JMS Connection
     @throws JMSException if thrown by JMS API methods
     */
    Connection createConnection() throws JMSException;

    /**
     * Create a new JMS Session for registration with a JmsResourceHolder.
     @param con the JMS Connection to create a Session for
     @return the new JMS Session
     @throws JMSException if thrown by JMS API methods
     */
    Session createSession(Connection conthrows JMSException;

    /**
     * Return whether to allow for a local JMS transaction that is synchronized with
     * a Spring-managed transaction (where the main transaction might be a JDBC-based
     * one for a specific DataSource, for example), with the JMS transaction
     * committing right after the main transaction.
     @return whether to allow for synchronizing a local JMS transaction
     */
    boolean isSynchedLocalTransactionAllowed();
  }


  /**
   * Callback for resource cleanup at the end of a non-native JMS transaction
   * (e.g. when participating in a JtaTransactionManager transaction).
   @see org.springframework.transaction.jta.JtaTransactionManager
   */
  private static class JmsResourceSynchronization extends ResourceHolderSynchronization<JmsResourceHolder, Object> {

    private final boolean transacted;

    public JmsResourceSynchronization(JmsResourceHolder resourceHolder, Object resourceKey, boolean transacted) {
      super(resourceHolder, resourceKey);
      this.transacted = transacted;
    }

    protected boolean shouldReleaseBeforeCompletion() {
      return !this.transacted;
    }

    protected void processResourceAfterCommit(JmsResourceHolder resourceHolder) {
      try {
        resourceHolder.commitAll();
      }
      catch (JMSException ex) {
        throw new SynchedLocalTransactionFailedException("Local JMS transaction failed to commit", ex);
      }
    }

    protected void releaseResource(JmsResourceHolder resourceHolder, Object resourceKey) {
      resourceHolder.closeAll();
    }
  }

}