Open Source Repository

Home /spring/spring-context-3.0.5 | Repository Home



org/springframework/scheduling/backportconcurrent/ThreadPoolTaskExecutor.java
/*
 * Copyright 2002-2009 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.scheduling.backportconcurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;

/**
 * JavaBean that allows for configuring a JSR-166 backport
 {@link edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor} in bean
 * style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
 * properties), exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
 * This is an alternative to configuring a ThreadPoolExecutor instance directly using
 * constructor injection, with a separate {@link ConcurrentTaskExecutor} adapter wrapping it.
 *
 <p>For any custom needs, in particular for defining a
 {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor},
 * it is recommended to use a straight definition of the Executor instance or a
 * factory method definition that points to the JSR-166 backport
 {@link edu.emory.mathcs.backport.java.util.concurrent.Executors} class.
 * To expose such a raw Executor as a Spring {@link org.springframework.core.task.TaskExecutor},
 * simply wrap it with a {@link ConcurrentTaskExecutor} adapter.
 *
 <p><b>NOTE:</b> This class implements Spring's
 {@link org.springframework.core.task.TaskExecutor} interface (and hence implicitly
 * the standard Java 5 {@link java.util.concurrent.Executor} interface) as well as
 * the JSR-166 {@link edu.emory.mathcs.backport.java.util.concurrent.Executor}
 * interface, with the former being the primary interface, the other just
 * serving as secondary convenience. For this reason, the exception handling
 * follows the TaskExecutor contract rather than the backport Executor contract, in
 * particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
 *
 @author Juergen Hoeller
 @since 2.0.3
 @see org.springframework.core.task.TaskExecutor
 @see edu.emory.mathcs.backport.java.util.concurrent.Executor
 @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor
 @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor
 @see edu.emory.mathcs.backport.java.util.concurrent.Executors
 @see ConcurrentTaskExecutor
 */
public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
    implements SchedulingTaskExecutor, Executor, BeanNameAware, InitializingBean, DisposableBean {

  protected final Log logger = LogFactory.getLog(getClass());

  private final Object poolSizeMonitor = new Object();

  private int corePoolSize = 1;

  private int maxPoolSize = Integer.MAX_VALUE;

  private int keepAliveSeconds = 60;

  private boolean allowCoreThreadTimeOut = false;

  private int queueCapacity = Integer.MAX_VALUE;

  private ThreadFactory threadFactory = this;

  private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

  private boolean waitForTasksToCompleteOnShutdown = false;

  private boolean threadNamePrefixSet = false;

  private String beanName;

  private ThreadPoolExecutor threadPoolExecutor;


  /**
   * Set the ThreadPoolExecutor's core pool size.
   * Default is 1.
   <p><b>This setting can be modified at runtime, for example through JMX.</b>
   */
  public void setCorePoolSize(int corePoolSize) {
    synchronized (this.poolSizeMonitor) {
      this.corePoolSize = corePoolSize;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setCorePoolSize(corePoolSize);
      }
    }
  }

  /**
   * Return the ThreadPoolExecutor's core pool size.
   */
  public int getCorePoolSize() {
    synchronized (this.poolSizeMonitor) {
      return this.corePoolSize;
    }
  }

  /**
   * Set the ThreadPoolExecutor's maximum pool size.
   * Default is <code>Integer.MAX_VALUE</code>.
   <p><b>This setting can be modified at runtime, for example through JMX.</b>
   */
  public void setMaxPoolSize(int maxPoolSize) {
    synchronized (this.poolSizeMonitor) {
      this.maxPoolSize = maxPoolSize;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
      }
    }
  }

  /**
   * Return the ThreadPoolExecutor's maximum pool size.
   */
  public int getMaxPoolSize() {
    synchronized (this.poolSizeMonitor) {
      return this.maxPoolSize;
    }
  }

  /**
   * Set the ThreadPoolExecutor's keep-alive seconds.
   * Default is 60.
   <p><b>This setting can be modified at runtime, for example through JMX.</b>
   */
  public void setKeepAliveSeconds(int keepAliveSeconds) {
    synchronized (this.poolSizeMonitor) {
      this.keepAliveSeconds = keepAliveSeconds;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
      }
    }
  }

  /**
   * Return the ThreadPoolExecutor's keep-alive seconds.
   */
  public int getKeepAliveSeconds() {
    synchronized (this.poolSizeMonitor) {
      return this.keepAliveSeconds;
    }
  }

  /**
   * Specify whether to allow core threads to time out. This enables dynamic
   * growing and shrinking even in combination with a non-zero queue (since
   * the max pool size will only grow once the queue is full).
   <p>Default is "false". Note that this feature is only available on
   * backport-concurrent 3.0 or above (based on the code in Java 6).
   @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
   */
  public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
    this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
  }

  /**
   * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
   * Default is <code>Integer.MAX_VALUE</code>.
   <p>Any positive value will lead to a LinkedBlockingQueue instance;
   * any other value will lead to a SynchronousQueue instance.
   @see edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue
   @see edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue
   */
  public void setQueueCapacity(int queueCapacity) {
    this.queueCapacity = queueCapacity;
  }

  /**
   * Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
   <p>Default is this executor itself (i.e. the factory that this executor
   * inherits from). See {@link org.springframework.util.CustomizableThreadCreator}'s
   * javadoc for available bean properties.
   @see #setThreadPriority
   @see #setDaemon
   */
  public void setThreadFactory(ThreadFactory threadFactory) {
    this.threadFactory = (threadFactory != null ? threadFactory : this);
  }

  /**
   * Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
   * Default is the ThreadPoolExecutor's default abort policy.
   @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.AbortPolicy
   */
  public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
    this.rejectedExecutionHandler =
        (rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
  }

  /**
   * Set whether to wait for scheduled tasks to complete on shutdown.
   <p>Default is "false". Switch this to "true" if you prefer
   * fully completed tasks at the expense of a longer shutdown phase.
   @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#shutdown()
   @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#shutdownNow()
   */
  public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
    this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
  }

  @Override
  public void setThreadNamePrefix(String threadNamePrefix) {
    super.setThreadNamePrefix(threadNamePrefix);
    this.threadNamePrefixSet = true;
  }

  public void setBeanName(String name) {
    this.beanName = name;
  }


  /**
   * Calls <code>initialize()</code> after the container applied all property values.
   @see #initialize()
   */
  public void afterPropertiesSet() {
    initialize();
  }

  /**
   * Creates the BlockingQueue and the ThreadPoolExecutor.
   @see #createQueue
   */
  public void initialize() {
    if (logger.isInfoEnabled()) {
      logger.info("Initializing ThreadPoolExecutor" (this.beanName != null " '" this.beanName + "'" ""));
    }
    if (!this.threadNamePrefixSet && this.beanName != null) {
      setThreadNamePrefix(this.beanName + "-");
    }
    BlockingQueue queue = createQueue(this.queueCapacity);
    this.threadPoolExecutor = new ThreadPoolExecutor(
        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
        queue, this.threadFactory, this.rejectedExecutionHandler);
    if (this.allowCoreThreadTimeOut) {
      this.threadPoolExecutor.allowCoreThreadTimeOut(true);
    }
  }

  /**
   * Create the BlockingQueue to use for the ThreadPoolExecutor.
   <p>A LinkedBlockingQueue instance will be created for a positive
   * capacity value; a SynchronousQueue else.
   @param queueCapacity the specified queue capacity
   @return the BlockingQueue instance
   @see edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue
   @see edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue
   */
  protected BlockingQueue createQueue(int queueCapacity) {
    if (queueCapacity > 0) {
      return new LinkedBlockingQueue(queueCapacity);
    }
    else {
      return new SynchronousQueue();
    }
  }

  /**
   * Return the underlying ThreadPoolExecutor for native access.
   @return the underlying ThreadPoolExecutor (never <code>null</code>)
   @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
   */
  public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
    Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
    return this.threadPoolExecutor;
  }


  /**
   * Implementation of both the JSR-166 backport Executor interface and the Spring
   * TaskExecutor interface, delegating to the ThreadPoolExecutor instance.
   @see edu.emory.mathcs.backport.java.util.concurrent.Executor#execute(Runnable)
   @see org.springframework.core.task.TaskExecutor#execute(Runnable)
   */
  public void execute(Runnable task) {
    Executor executor = getThreadPoolExecutor();
    try {
      executor.execute(task);
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public void execute(Runnable task, long startTimeout) {
    execute(task);
  }

  public Future<?> submit(Runnable task) {
    FutureTask<Object> future = new FutureTask<Object>(task, null);
    execute(future);
    return future;
  }

  public <T> Future<T> submit(Callable<T> task) {
    FutureTask<T> future = new FutureTask<T>(task);
    execute(future);
    return future;
  }

  /**
   * This task executor prefers short-lived work units.
   */
  public boolean prefersShortLivedTasks() {
    return true;
  }


  /**
   * Return the current pool size.
   @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#getPoolSize()
   */
  public int getPoolSize() {
    return getThreadPoolExecutor().getPoolSize();
  }

  /**
   * Return the number of currently active threads.
   @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#getActiveCount()
   */
  public int getActiveCount() {
    return getThreadPoolExecutor().getActiveCount();
  }


  /**
   * Calls <code>shutdown</code> when the BeanFactory destroys
   * the task executor instance.
   @see #shutdown()
   */
  public void destroy() {
    shutdown();
  }

  /**
   * Perform a shutdown on the ThreadPoolExecutor.
   @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor#shutdown()
   */
  public void shutdown() {
    if (logger.isInfoEnabled()) {
      logger.info("Shutting down ThreadPoolExecutor" (this.beanName != null " '" this.beanName + "'" ""));
    }
    if (this.waitForTasksToCompleteOnShutdown) {
      this.threadPoolExecutor.shutdown();
    }
    else {
      this.threadPoolExecutor.shutdownNow();
    }
  }

}