View Javadoc

1   /**
2    * Copyright (C) 2005-2009 Alfresco Software Limited.
3    *
4    * This file is part of the Spring Surf Extension project.
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *  http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.springframework.extensions.surf.task;
20  
21  import java.util.HashMap;
22  import java.util.Queue;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  
27  /**
28   * Basic implementation of a task manager for use in asynchronous 
29   * processing and monitoring of tasks.
30   * 
31   * Tasks are added to the task manager.  When added, they are 
32   * assigned a reference id so that subsequent calls to the task
33   * manager can check on the task status.
34   * 
35   * A task starter thread wakes up occasionally and checks whether 
36   * there are any tasks waiting to be processed.
37   * 
38   * If a task is found, it is attached to a task worker thread and 
39   * then executed.  When the task worker thread completes, it is 
40   * handed back to the task worker thread pool.
41   * 
42   * @author muzquiano
43   */
44  public class TaskManager
45  {
46      private static Log logger = LogFactory.getLog(TaskManager.class);
47  
48      // the thread launcher thread
49      private TaskStarterThread taskStarterThread = null;
50      
51      // our collection of worker threads
52      private HashMap<String, TaskWorkerThread> threads;
53  
54      // tasks that are queued up for us to do
55      private Queue<AbstractTask> taskQueue = null;
56  
57      // all tasks that are running
58      private HashMap<String, AbstractTask> allTasks = null;
59  
60      // maximum number of tasks we should run concurrently
61      private int threadPoolSize = 5;
62      
63      // how often the nightcrawler should wake up to check
64      // if there are new jobs to process
65      private int wakeupPeriod = 1000;
66  
67      public TaskManager()
68      {
69      }
70      
71      public void init()
72      {
73          // initialize all collections
74          taskQueue = new java.util.PriorityQueue<AbstractTask>();
75          allTasks = new HashMap<String, AbstractTask>();
76          threads = new HashMap<String, TaskWorkerThread>();        
77  
78          // create the nightkeeper thread that will make sure
79          // that tasks are being processed
80          taskStarterThread = new TaskStarterThread(this);
81          taskStarterThread.start();
82  
83          // stock our available threads
84          for (int i = 0; i < getThreadPoolSize(); i++)
85          {
86              TaskWorkerThread thread = new TaskWorkerThread(this);
87              getThreads().put(thread.getName(), thread);
88              thread.start();
89  
90              if (logger.isDebugEnabled())
91                  logger.debug("Added WorkerThread to pool (" + (i + 1) + " of " + getThreadPoolSize() + ")");
92          }        
93      }
94      
95      public void setThreadPoolSize(int threadPoolSize)
96      {
97          this.threadPoolSize = threadPoolSize;
98      }
99      
100     public int getThreadPoolSize()
101     {
102         return this.threadPoolSize;
103     }
104     
105     public void setWakeupPeriod(int wakeupPeriod)
106     {
107         this.wakeupPeriod = wakeupPeriod;
108     }
109     
110     public int getWakeupPeriod()
111     {
112         return this.wakeupPeriod;
113     }
114 
115     public int getThreadCount()
116     {
117         return threads.size();
118     }
119 
120     public int getTaskQueueCount()
121     {
122         return taskQueue.size();
123     }
124 
125     public synchronized String addTask(AbstractTask task)
126     {
127         taskQueue.add(task);
128         allTasks.put(task.getId(), task);
129 
130         // return the task id
131         return task.getId();
132     }
133 
134     /**
135      * Returns a task that is running or has finished running
136      */
137     public AbstractTask getTask(String taskId)
138     {
139         return (AbstractTask) allTasks.get(taskId);
140     }
141 
142     /**
143      * Returns all of the tasks
144      */
145     public HashMap<String, AbstractTask> getAllTasks()
146     {
147         return allTasks;
148     }
149 
150     public void remove(String taskId)
151     {
152         AbstractTask task = getTask(taskId);
153         if (task != null)
154         {
155             remove(task);
156         }
157     }
158 
159     public void remove(AbstractTask task)
160     {
161         if (task != null && task.isFinished())
162         {
163             allTasks.remove(task.getId());
164         }
165     }
166 
167     public Queue<AbstractTask> getTaskQueue()
168     {
169         return taskQueue;
170     }
171 
172     public HashMap<String, TaskWorkerThread> getThreads()
173     {
174         return threads;
175     }
176 
177     public void cancel(String taskId)
178     {
179         AbstractTask task = getTask(taskId);
180         if (task != null)
181         {
182             task.setStatus("Cancel called for the task");
183             task.cancel();
184             task.setStatus("Cancellation completed");
185             task.isCancelled = true;
186         }
187     }
188 }