Open Source Repository

Home /spring/spring-jms-3.0.5 | Repository Home



org/springframework/jms/listener/SimpleMessageListenerContainer.java
/*
 * Copyright 2002-2010 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.jms.listener;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.springframework.jms.support.JmsUtils;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/**
 * Message listener container that uses the plain JMS client API's
 <code>MessageConsumer.setMessageListener()</code> method to
 * create concurrent MessageConsumers for the specified listeners.
 *
 <p>This is the simplest form of a message listener container.
 * It creates a fixed number of JMS Sessions to invoke the listener,
 * not allowing for dynamic adaptation to runtime demands. Its main
 * advantage is its low level of complexity and the minimum requirements
 * on the JMS provider: Not even the ServerSessionPool facility is required.
 *
 <p>See the {@link AbstractMessageListenerContainer} javadoc for details
 * on acknowledge modes and transaction options.
 *
 <p>For a different style of MessageListener handling, through looped
 <code>MessageConsumer.receive()</code> calls that also allow for
 * transactional reception of messages (registering them with XA transactions),
 * see {@link DefaultMessageListenerContainer}.
 *
 @author Juergen Hoeller
 @since 2.0
 @see javax.jms.MessageConsumer#setMessageListener
 @see DefaultMessageListenerContainer
 @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
 */
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ExceptionListener {

  private boolean pubSubNoLocal = false;

  private int concurrentConsumers = 1;

  private Executor taskExecutor;

  private Set<Session> sessions;

  private Set<MessageConsumer> consumers;

  private final Object consumersMonitor = new Object();


  /**
   * Set whether to inhibit the delivery of messages published by its own connection.
   * Default is "false".
   @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
   */
  public void setPubSubNoLocal(boolean pubSubNoLocal) {
    this.pubSubNoLocal = pubSubNoLocal;
  }

  /**
   * Return whether to inhibit the delivery of messages published by its own connection.
   */
  protected boolean isPubSubNoLocal() {
    return this.pubSubNoLocal;
  }

  /**
   * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
   * upper limit String, e.g. "10".
   <p>This listener container will always hold on to the maximum number of
   * consumers {@link #setConcurrentConsumers} since it is unable to scale.
   <p>This property is primarily supported for configuration compatibility with
   {@link DefaultMessageListenerContainer}. For this local listener container,
   * generally use {@link #setConcurrentConsumers} instead.
   */
  public void setConcurrency(String concurrency) {
    try {
      int separatorIndex = concurrency.indexOf('-');
      if (separatorIndex != -1) {
        setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
      }
      else {
        setConcurrentConsumers(Integer.parseInt(concurrency));
      }
    }
    catch (NumberFormatException ex) {
      throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
          "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " +
          "Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " +
          "always keep a fixed number of consumers according to the maximum value.");
    }
  }

  /**
   * Specify the number of concurrent consumers to create. Default is 1.
   <p>Raising the number of concurrent consumers is recommendable in order
   * to scale the consumption of messages coming in from a queue. However,
   * note that any ordering guarantees are lost once multiple consumers are
   * registered. In general, stick with 1 consumer for low-volume queues.
   <p><b>Do not raise the number of concurrent consumers for a topic.</b>
   * This would lead to concurrent consumption of the same message,
   * which is hardly ever desirable.
   */
  public void setConcurrentConsumers(int concurrentConsumers) {
    Assert.isTrue(concurrentConsumers > 0"'concurrentConsumers' value must be at least 1 (one)");
    this.concurrentConsumers = concurrentConsumers;
  }

  /**
   * Set the Spring TaskExecutor to use for executing the listener once
   * a message has been received by the provider.
   <p>Default is none, that is, to run in the JMS provider's own receive thread,
   * blocking the provider's receive endpoint while executing the listener.
   <p>Specify a TaskExecutor for executing the listener in a different thread,
   * rather than blocking the JMS provider, usually integrating with an existing
   * thread pool. This allows to keep the number of concurrent consumers low (1)
   * while still processing messages concurrently (decoupled from receiving!).
   <p><b>NOTE: Specifying a TaskExecutor for listener execution affects
   * acknowledgement semantics.</b> Messages will then always get acknowledged
   * before listener execution, with the underlying Session immediately reused
   * for receiving the next message. Using this in combination with a transacted
   * session or with client acknowledgement will lead to unspecified results!
   <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead
   * to concurrent processing of messages that have been received by the same
   * underlying Session.</b> As a consequence, it is not recommended to use
   * this setting with a {@link SessionAwareMessageListener}, at least not
   * if the latter performs actual work on the given Session. A standard
   {@link javax.jms.MessageListener} will work fine, in general.
   @see #setConcurrentConsumers
   @see org.springframework.core.task.SimpleAsyncTaskExecutor
   @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
   */
  public void setTaskExecutor(Executor taskExecutor) {
    this.taskExecutor = taskExecutor;
  }

  protected void validateConfiguration() {
    super.validateConfiguration();
    if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
      throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription");
    }
  }


  //-------------------------------------------------------------------------
  // Implementation of AbstractMessageListenerContainer's template methods
  //-------------------------------------------------------------------------

  /**
   * Always use a shared JMS Connection.
   */
  protected final boolean sharedConnectionEnabled() {
    return true;
  }

  /**
   * Creates the specified number of concurrent consumers,
   * in the form of a JMS Session plus associated MessageConsumer.
   @see #createListenerConsumer
   */
  protected void doInitialize() throws JMSException {
    establishSharedConnection();
    initializeConsumers();
  }

  /**
   * Re-initializes this container's JMS message consumers,
   * if not initialized already.
   */
  protected void doStart() throws JMSException {
    super.doStart();
    initializeConsumers();
  }

  /**
   * Registers this listener container as JMS ExceptionListener on the shared connection.
   */
  protected void prepareSharedConnection(Connection connectionthrows JMSException {
    super.prepareSharedConnection(connection);
    connection.setExceptionListener(this);
  }

  /**
   * JMS ExceptionListener implementation, invoked by the JMS provider in
   * case of connection failures. Re-initializes this listener container's
   * shared connection and its sessions and consumers.
   @param ex the reported connection exception
   */
  public void onException(JMSException ex) {
    // First invoke the user-specific ExceptionListener, if any.
    invokeExceptionListener(ex);

    // Now try to recover the shared Connection and all consumers...
    if (logger.isInfoEnabled()) {
      logger.info("Trying to recover from JMS Connection exception: " + ex);
    }
    try {
      synchronized (this.consumersMonitor) {
        this.sessions = null;
        this.consumers = null;
      }
      refreshSharedConnection();
      initializeConsumers();
      logger.info("Successfully refreshed JMS Connection");
    }
    catch (JMSException recoverEx) {
      logger.debug("Failed to recover JMS Connection", recoverEx);
      logger.error("Encountered non-recoverable JMSException", ex);
    }
  }

  /**
   * Initialize the JMS Sessions and MessageConsumers for this container.
   @throws JMSException in case of setup failure
   */
  protected void initializeConsumers() throws JMSException {
    // Register Sessions and MessageConsumers.
    synchronized (this.consumersMonitor) {
      if (this.consumers == null) {
        this.sessions = new HashSet<Session>(this.concurrentConsumers);
        this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);
        Connection con = getSharedConnection();
        for (int i = 0; i < this.concurrentConsumers; i++) {
          Session session = createSession(con);
          MessageConsumer consumer = createListenerConsumer(session);
          this.sessions.add(session);
          this.consumers.add(consumer);
        }
      }
    }
  }

  /**
   * Create a MessageConsumer for the given JMS Session,
   * registering a MessageListener for the specified listener.
   @param session the JMS Session to work on
   @return the MessageConsumer
   @throws JMSException if thrown by JMS methods
   @see #executeListener
   */
  protected MessageConsumer createListenerConsumer(final Session sessionthrows JMSException {
    Destination destination = getDestination();
    if (destination == null) {
      destination = resolveDestinationName(session, getDestinationName());
    }
    MessageConsumer consumer = createConsumer(session, destination);

    if (this.taskExecutor != null) {
      consumer.setMessageListener(new MessageListener() {
        public void onMessage(final Message message) {
          taskExecutor.execute(new Runnable() {
            public void run() {
              processMessage(message, session);
            }
          });
        }
      });
    }
    else {
      consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
          processMessage(message, session);
        }
      });
    }

    return consumer;
  }

  /**
   * Process a message received from the provider.
   <p>Executes the listener, exposing the current JMS Session as
   * thread-bound resource (if "exposeListenerSession" is "true").
   @param message the received JMS Message
   @param session the JMS Session to operate on
   @see #executeListener
   @see #setExposeListenerSession
   */
  protected void processMessage(Message message, Session session) {
    boolean exposeResource = isExposeListenerSession();
    if (exposeResource) {
      TransactionSynchronizationManager.bindResource(
          getConnectionFactory()new LocallyExposedJmsResourceHolder(session));
    }
    try {
      executeListener(session, message);
    }
    finally {
      if (exposeResource) {
        TransactionSynchronizationManager.unbindResource(getConnectionFactory());
      }
    }
  }

  /**
   * Destroy the registered JMS Sessions and associated MessageConsumers.
   */
  protected void doShutdown() throws JMSException {
    logger.debug("Closing JMS MessageConsumers");
    for (MessageConsumer consumer : this.consumers) {
      JmsUtils.closeMessageConsumer(consumer);
    }
    logger.debug("Closing JMS Sessions");
    for (Session session : this.sessions) {
      JmsUtils.closeSession(session);
    }
  }


  //-------------------------------------------------------------------------
  // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
  //-------------------------------------------------------------------------

  /**
   * Create a JMS MessageConsumer for the given Session and Destination.
   <p>This implementation uses JMS 1.1 API.
   @param session the JMS Session to create a MessageConsumer for
   @param destination the JMS Destination to create a MessageConsumer for
   @return the new JMS MessageConsumer
   @throws JMSException if thrown by JMS API methods
   */
  protected MessageConsumer createConsumer(Session session, Destination destinationthrows JMSException {
    // Only pass in the NoLocal flag in case of a Topic:
    // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
    // in case of the NoLocal flag being specified for a Queue.
    if (isPubSubDomain()) {
      if (isSubscriptionDurable() && destination instanceof Topic) {
        return session.createDurableSubscriber(
            (Topicdestination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
      }
      else {
        return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
      }
    }
    else {
      return session.createConsumer(destination, getMessageSelector());
    }
  }

}