Open Source Repository

Home /spring/spring-context-3.0.5 | Repository Home



org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java
/*
 * Copyright 2002-2009 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.scheduling.concurrent;

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.TaskUtils;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;

/**
 * Implementation of Spring's {@link TaskScheduler} interface, wrapping
 * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}.
 *
 @author Juergen Hoeller
 @author Mark Fisher
 @since 3.0
 @see #setPoolSize
 @see #setThreadFactory
 @see #setErrorHandler
 */
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
    implements TaskScheduler, SchedulingTaskExecutor {

  private volatile int poolSize = 1;

  private volatile ScheduledExecutorService scheduledExecutor;

  private volatile ErrorHandler errorHandler;


  /**
   * Set the ScheduledExecutorService's pool size.
   * Default is 1.
   */
  public void setPoolSize(int poolSize) {
    Assert.isTrue(poolSize > 0"'poolSize' must be 1 or higher");
    this.poolSize = poolSize;
  }

  /**
   * Provide an {@link ErrorHandler} strategy.
   */
  public void setErrorHandler(ErrorHandler errorHandler) {
    Assert.notNull(errorHandler, "'errorHandler' must not be null");
    this.errorHandler = errorHandler;
  }

  protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

    this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
    return this.scheduledExecutor;
  }

  /**
   * Create a new {@link ScheduledExecutorService} instance.
   <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
   * Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
   @param poolSize the specified pool size
   @param threadFactory the ThreadFactory to use
   @param rejectedExecutionHandler the RejectedExecutionHandler to use
   @return a new ScheduledExecutorService instance
   @see #afterPropertiesSet()
   @see java.util.concurrent.ScheduledThreadPoolExecutor
   */
  protected ScheduledExecutorService createExecutor(
      int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

    return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
  }

  /**
   * Return the underlying ScheduledExecutorService for native access.
   @return the underlying ScheduledExecutorService (never <code>null</code>)
   @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
   */
  public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
    Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
    return this.scheduledExecutor;
  }


  // SchedulingTaskExecutor implementation

  public void execute(Runnable task) {
    Executor executor = getScheduledExecutor();
    try {
      executor.execute(errorHandlingTask(task, false));
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public void execute(Runnable task, long startTimeout) {
    execute(task);
  }

  public Future<?> submit(Runnable task) {
    ExecutorService executor = getScheduledExecutor();
    try {
      return executor.submit(errorHandlingTask(task, false));
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public <T> Future<T> submit(Callable<T> task) {
    ExecutorService executor = getScheduledExecutor();
    try {
      if (this.errorHandler != null) {
        task = new DelegatingErrorHandlingCallable<T>(task, this.errorHandler);
      }
      return executor.submit(task);
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public boolean prefersShortLivedTasks() {
    return true;
  }


  // TaskScheduler implementation

  public ScheduledFuture schedule(Runnable task, Trigger trigger) {
    ScheduledExecutorService executor = getScheduledExecutor();
    try {
      ErrorHandler errorHandler = this.errorHandler != null ?
          this.errorHandler : TaskUtils.getDefaultErrorHandler(true)
      return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public ScheduledFuture schedule(Runnable task, Date startTime) {
    ScheduledExecutorService executor = getScheduledExecutor();
    long initialDelay = startTime.getTime() - System.currentTimeMillis();
    try {
      return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period) {
    ScheduledExecutorService executor = getScheduledExecutor();
    long initialDelay = startTime.getTime() - System.currentTimeMillis();
    try {
      return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {
    ScheduledExecutorService executor = getScheduledExecutor();
    try {
      return executor.scheduleAtFixedRate(errorHandlingTask(task, true)0, period, TimeUnit.MILLISECONDS);
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
    ScheduledExecutorService executor = getScheduledExecutor();
    long initialDelay = startTime.getTime() - System.currentTimeMillis();
    try {
      return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {
    ScheduledExecutorService executor = getScheduledExecutor();
    try {
      return executor.scheduleWithFixedDelay(errorHandlingTask(task, true)0, delay, TimeUnit.MILLISECONDS);
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

  private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
    return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
  }


  private static class DelegatingErrorHandlingCallable<V> implements Callable<V> {

    private final Callable<V> delegate;

    private final ErrorHandler errorHandler;

    DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) {
      this.delegate = delegate;
      this.errorHandler = errorHandler;
    }

    public V call() throws Exception {
      try {
        return delegate.call();
      }
      catch (Throwable t) {
        this.errorHandler.handleError(t);
        return null;
      }
    }
  }

}