1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.springframework.extensions.surf.task;
20
21 import java.util.HashMap;
22 import java.util.Queue;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public class TaskManager
45 {
46 private static Log logger = LogFactory.getLog(TaskManager.class);
47
48
49 private TaskStarterThread taskStarterThread = null;
50
51
52 private HashMap<String, TaskWorkerThread> threads;
53
54
55 private Queue<AbstractTask> taskQueue = null;
56
57
58 private HashMap<String, AbstractTask> allTasks = null;
59
60
61 private int threadPoolSize = 5;
62
63
64
65 private int wakeupPeriod = 1000;
66
67 public TaskManager()
68 {
69 }
70
71 public void init()
72 {
73
74 taskQueue = new java.util.PriorityQueue<AbstractTask>();
75 allTasks = new HashMap<String, AbstractTask>();
76 threads = new HashMap<String, TaskWorkerThread>();
77
78
79
80 taskStarterThread = new TaskStarterThread(this);
81 taskStarterThread.start();
82
83
84 for (int i = 0; i < getThreadPoolSize(); i++)
85 {
86 TaskWorkerThread thread = new TaskWorkerThread(this);
87 getThreads().put(thread.getName(), thread);
88 thread.start();
89
90 if (logger.isDebugEnabled())
91 logger.debug("Added WorkerThread to pool (" + (i + 1) + " of " + getThreadPoolSize() + ")");
92 }
93 }
94
95 public void setThreadPoolSize(int threadPoolSize)
96 {
97 this.threadPoolSize = threadPoolSize;
98 }
99
100 public int getThreadPoolSize()
101 {
102 return this.threadPoolSize;
103 }
104
105 public void setWakeupPeriod(int wakeupPeriod)
106 {
107 this.wakeupPeriod = wakeupPeriod;
108 }
109
110 public int getWakeupPeriod()
111 {
112 return this.wakeupPeriod;
113 }
114
115 public int getThreadCount()
116 {
117 return threads.size();
118 }
119
120 public int getTaskQueueCount()
121 {
122 return taskQueue.size();
123 }
124
125 public synchronized String addTask(AbstractTask task)
126 {
127 taskQueue.add(task);
128 allTasks.put(task.getId(), task);
129
130
131 return task.getId();
132 }
133
134
135
136
137 public AbstractTask getTask(String taskId)
138 {
139 return (AbstractTask) allTasks.get(taskId);
140 }
141
142
143
144
145 public HashMap<String, AbstractTask> getAllTasks()
146 {
147 return allTasks;
148 }
149
150 public void remove(String taskId)
151 {
152 AbstractTask task = getTask(taskId);
153 if (task != null)
154 {
155 remove(task);
156 }
157 }
158
159 public void remove(AbstractTask task)
160 {
161 if (task != null && task.isFinished())
162 {
163 allTasks.remove(task.getId());
164 }
165 }
166
167 public Queue<AbstractTask> getTaskQueue()
168 {
169 return taskQueue;
170 }
171
172 public HashMap<String, TaskWorkerThread> getThreads()
173 {
174 return threads;
175 }
176
177 public void cancel(String taskId)
178 {
179 AbstractTask task = getTask(taskId);
180 if (task != null)
181 {
182 task.setStatus("Cancel called for the task");
183 task.cancel();
184 task.setStatus("Cancellation completed");
185 task.isCancelled = true;
186 }
187 }
188 }