EMMA Coverage Report (generated Tue Feb 12 22:23:49 ICT 2008)
[all classes][net.sourceforge.hiveevents]

COVERAGE SUMMARY FOR SOURCE FILE [ChannelImpl.java]

nameclass, %method, %block, %line, %
ChannelImpl.java86%  (6/7)75%  (30/40)67%  (365/542)69%  (91.5/133)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ChannelImpl100% (1/1)79%  (15/19)62%  (254/407)64%  (58.5/92)
block (): void 0%   (0/1)0%   (0/12)0%   (0/3)
unblock (): void 0%   (0/1)0%   (0/43)0%   (0/9)
unblockPullConsumer (int): void 0%   (0/1)0%   (0/22)0%   (0/6)
unregisterAllConsumers (): void 0%   (0/1)0%   (0/7)0%   (0/3)
logEvent (String, Object): void 100% (1/1)14%  (4/28)50%  (1.5/3)
purgeConsumers (): void 100% (1/1)30%  (8/27)33%  (2/6)
unregisterConsumer (int): void 100% (1/1)64%  (23/36)78%  (7/9)
push (Object): void 100% (1/1)81%  (25/31)78%  (7/9)
registerPushConsumer (int, Filter, Consumer): int 100% (1/1)83%  (34/41)70%  (7/10)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
ChannelImpl (String, int, Class, boolean): void 100% (1/1)100% (47/47)100% (13/13)
getConsumer (int): ChannelImpl$ConsumerInfo 100% (1/1)100% (7/7)100% (1/1)
getName (): String 100% (1/1)100% (3/3)100% (1/1)
pull (int): Object [] 100% (1/1)100% (5/5)100% (1/1)
pull (int, long): Object [] 100% (1/1)100% (35/35)100% (7/7)
registerConsumer (ChannelImpl$ConsumerInfo): void 100% (1/1)100% (26/26)100% (5/5)
registerPullConsumer (): int 100% (1/1)100% (6/6)100% (1/1)
registerPullConsumer (Filter): int 100% (1/1)100% (19/19)100% (3/3)
registerPushConsumer (int, Consumer): int 100% (1/1)100% (8/8)100% (1/1)
     
class ChannelImpl$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class ChannelImpl$PriorityComparator100% (1/1)100% (3/3)73%  (16/22)80%  (4/5)
compare (ChannelImpl$ConsumerInfo, ChannelImpl$ConsumerInfo): int 100% (1/1)62%  (10/16)75%  (3/4)
ChannelImpl$PriorityComparator (): void 100% (1/1)100% (3/3)100% (1/1)
ChannelImpl$PriorityComparator (ChannelImpl$1): void 100% (1/1)100% (3/3)100% (1/1)
     
class ChannelImpl$PullConsumerInfo100% (1/1)67%  (4/6)78%  (38/49)73%  (8/11)
pull (Class): Object [] 0%   (0/1)0%   (0/7)0%   (0/1)
unblock (): void 0%   (0/1)0%   (0/4)0%   (0/2)
ChannelImpl$PullConsumerInfo (int, int, Filter): void 100% (1/1)100% (11/11)100% (3/3)
extract (Class, List): Object [] 100% (1/1)100% (9/9)100% (1/1)
pull (Class, long): Object [] 100% (1/1)100% (8/8)100% (1/1)
push (Object): void 100% (1/1)100% (10/10)100% (3/3)
     
class ChannelImpl$AbstractConsumerInfo100% (1/1)57%  (4/7)83%  (20/24)73%  (8/11)
pull (Class, long): Object [] 0%   (0/1)0%   (0/2)0%   (0/1)
push (Object): void 0%   (0/1)0%   (0/1)0%   (0/1)
unblock (): void 0%   (0/1)0%   (0/1)0%   (0/1)
ChannelImpl$AbstractConsumerInfo (int, int, Filter): void 100% (1/1)100% (12/12)100% (5/5)
getId (): int 100% (1/1)100% (3/3)100% (1/1)
getPriority (): int 100% (1/1)100% (3/3)100% (1/1)
pull (Class): Object [] 100% (1/1)100% (2/2)100% (1/1)
     
class ChannelImpl$WeakPushConsumer100% (1/1)67%  (2/3)86%  (18/21)88%  (7/8)
getId (): int 0%   (0/1)0%   (0/3)0%   (0/1)
ChannelImpl$WeakPushConsumer (int, Consumer, ReferenceQueue): void 100% (1/1)100% (8/8)100% (3/3)
push (Object): void 100% (1/1)100% (10/10)100% (4/4)
     
class ChannelImpl$PushConsumerInfo100% (1/1)100% (2/2)100% (19/19)100% (6/6)
ChannelImpl$PushConsumerInfo (int, int, Consumer, Filter): void 100% (1/1)100% (9/9)100% (3/3)
push (Object): void 100% (1/1)100% (10/10)100% (3/3)

1//  Copyright 2004-2007 Jean-Francois Poilpret
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14 
15package net.sourceforge.hiveevents;
16 
17import java.lang.ref.ReferenceQueue;
18import java.lang.ref.WeakReference;
19import java.lang.reflect.Array;
20import java.util.ArrayList;
21import java.util.Collections;
22import java.util.Comparator;
23import java.util.HashMap;
24import java.util.Iterator;
25import java.util.List;
26import java.util.Map;
27 
28import org.apache.commons.logging.Log;
29import org.apache.commons.logging.LogFactory;
30 
31import net.sourceforge.hiveutils.collections.Queue;
32import net.sourceforge.hiveutils.collections.impl.QueueImpl;
33 
34/**
35 * Thread-unsafe implementation of an Event Channel.
36 * @author Jean-Francois Poilpret
37 */
38public class ChannelImpl<T> implements Channel<T>
39{
40        static private final Log        _logger = LogFactory.getLog(ChannelImpl.class);
41        
42        public ChannelImpl(        String                name, 
43                                                int                        pullConsumerPriority, 
44                                                Class<T>        clazz, 
45                                                boolean                logEvents)
46        {
47                _name = name;
48                _pullConsumerPriority = pullConsumerPriority;
49                _clazz = clazz;
50                _logEvents = logEvents;
51        }
52        
53        public String        getName()
54        {
55                return _name;
56        }
57 
58        public void                block()
59        {
60                if (_blocked++ == 0)
61                {
62                        _blockedEvents.clear();
63                }
64        }
65        
66        public void                unblock()
67        {
68                if (_blocked == 0)
69                {
70                        _logger.warn("unblock() channel <" + _name + "> is not blocked");
71                        return;
72                }
73                if (--_blocked == 0)
74                {
75                        for (T event: _blockedEvents)
76                        {
77                                push(event);
78                        }
79                        _blockedEvents.clear();
80                }
81        }
82 
83        // For All consumers
84        public void                unregisterConsumer(int idConsumer)
85        {
86                if (_consumers.remove(idConsumer) == null)
87                {
88                        _logger.warn("unregisterConsumer() unexisting idConsumer <" + idConsumer + ">");
89                        return;
90                }
91                Iterator<ConsumerInfo<T>> i = _priorityConsumers.iterator();
92                while (i.hasNext())
93                {
94                        if (i.next().getId() == idConsumer)
95                        {
96                                i.remove();
97                                break;
98                        }
99                }
100        }
101 
102        public void                unregisterAllConsumers()
103        {
104                _priorityConsumers.clear();
105                _consumers.clear();
106        }
107 
108        // For Push consumers
109        public int                registerPushConsumer(int priority, Consumer<T> consumer)
110        {
111                return registerPushConsumer(priority, new PassAllFilter<T>(), consumer);
112        }
113 
114        public int                registerPushConsumer(int priority, Filter<T> filter, Consumer<T> consumer)
115        {
116                if (consumer == null)
117                {
118                        _logger.error("registerPushConsumer() consumer must not be null");
119                        return -1;
120                }
121                _lastId++;
122                Consumer<T> actualConsumer;
123                if (!(consumer instanceof PersistentConsumer))
124                {
125                        actualConsumer = new WeakPushConsumer<T>(_lastId, consumer, _queue);
126                }
127                else
128                {
129                        actualConsumer = consumer;
130                }
131                registerConsumer(new PushConsumerInfo<T>(_lastId, priority, actualConsumer, filter));
132                return _lastId;
133        }
134 
135        // For Pull consumers
136        public int                registerPullConsumer()
137        {
138                return registerPullConsumer(new PassAllFilter<T>());
139        }
140 
141        public int                registerPullConsumer(Filter<T> filter)
142        {
143                _lastId++;
144                registerConsumer(new PullConsumerInfo<T>(_lastId, _pullConsumerPriority, filter));
145                return _lastId;
146        }
147        
148        @SuppressWarnings("unchecked") 
149        protected void        purgeConsumers()
150        {
151                WeakPushConsumer reference;
152                while ((reference = (WeakPushConsumer) _queue.poll()) != null)
153                {
154                        int id = reference.getId();
155                        _logger.debug("purgeConsumers() removing consumer id <" + id + ">");
156                        // Remove completely
157                        unregisterConsumer(id);
158                }
159        }
160        
161        protected void        registerConsumer(ConsumerInfo<T> info)
162        {
163                _consumers.put(info.getId(), info);
164                int index = Collections.binarySearch(_priorityConsumers, info, _compare);
165                index = -index - 1;
166                _priorityConsumers.add(index, info);
167        }
168 
169        public void                push(T event)
170        {
171                logEvent("push", event);
172                if (_blocked > 0)
173                {
174                        _blockedEvents.add(event);
175                        return;
176                }
177 
178                purgeConsumers();
179                for (ConsumerInfo<T> info: _priorityConsumers)
180                {
181                        info.push(event);
182                }
183        }
184        
185        protected void        logEvent(String message, T event)
186        {
187                if (_logEvents && _logger.isDebugEnabled())
188                {
189                        _logger.debug("Channel<" + _name + "> " + message + "(" + event + ")");
190                }
191        }
192        
193        public T[]        pull(int idConsumer)
194        {
195                return pull(idConsumer, -1L);
196        }
197        
198        public T[]        pull(int idConsumer, long timeout)
199        {
200                ConsumerInfo<T> info = getConsumer(idConsumer);
201                if (info == null)
202                {
203                        _logger.warn("pull() unexisting idConsumer <" + idConsumer + ">");
204                        return null;
205                }
206                if (timeout < 0)
207                {
208                        return info.pull(_clazz);
209                }
210                else
211                {
212                        return info.pull(_clazz, timeout);
213                }
214        }
215        
216        public void                unblockPullConsumer(int idConsumer)
217        {
218                ConsumerInfo info = getConsumer(idConsumer);
219                if (info == null)
220                {
221                        _logger.warn("unblockPullConsumer() unexisting idConsumer <" + idConsumer + ">");
222                        return;
223                }
224                info.unblock();
225        }
226 
227        protected ConsumerInfo<T>        getConsumer(int idConsumer)
228        {
229                return _consumers.get(idConsumer);
230        }
231        
232        protected interface ConsumerInfo<T>
233        {
234                public void                unblock();
235                public void                push(T event);
236                public T[]                pull(Class<T> clazz);
237                public T[]                pull(Class<T> clazz, long timeout);
238                public int                getPriority();
239                public int                getId();
240        }
241 
242        static abstract protected class AbstractConsumerInfo<T> implements ConsumerInfo<T>
243        {
244                protected AbstractConsumerInfo(int id, int priority, Filter<T> filter)
245                {
246                        _id = id;
247                        _priority = priority;
248                        _filter = filter;
249                }
250                
251                public void                unblock()
252                {
253                }
254                public void                push(T event)
255                {
256                }
257                public T[]        pull(Class<T> clazz)
258                {
259                        return null;
260                }
261                public T[]        pull(Class<T> clazz, long timeout)
262                {
263                        return null;
264                }
265                public int                getPriority()
266                {
267                        return _priority;
268                }
269                public int                getId()
270                {
271                        return _id;
272                }
273                
274                private final int                        _id;
275                private final int                        _priority;
276                protected final Filter<T>        _filter;
277        }
278 
279        static private class PushConsumerInfo<T> extends AbstractConsumerInfo<T>
280        {
281                public PushConsumerInfo(int id, int priority, Consumer<T> consumer, Filter<T> filter)
282                {
283                        super(id, priority, filter);
284                        _consumer = consumer;
285                }
286 
287                public void                push(T event)
288                {
289                        if (_filter.passEvent(event))
290                        {
291                                _consumer.push(event);
292                        }
293                }
294 
295                private final Consumer<T>        _consumer;
296        }
297 
298        static private class WeakPushConsumer<T>
299                extends WeakReference<Consumer<T>> 
300                implements Consumer<T>
301        {
302                public WeakPushConsumer(int                                                        id, 
303                                                                Consumer<T>                                        consumer, 
304                                                                ReferenceQueue<Consumer<T>>        queue)
305                {
306                        super(consumer, queue);
307                        _id = id;
308                }
309                
310                public void                push(T event)
311                {
312                        Consumer<T> consumer = get();
313                        if (consumer != null)
314                        {
315                                consumer.push(event);
316                        }
317                }
318 
319                public int        getId()
320                {
321                        return _id;
322                }
323                
324                private final int        _id;
325        }
326        
327        static private class PullConsumerInfo<T> extends AbstractConsumerInfo<T>
328        {
329                public PullConsumerInfo(int id, int priority, Filter<T> filter)
330                {
331                        super(id, priority, filter);
332                }
333 
334                public void                unblock()
335                {
336                        _queue.unblock();
337                }
338                public void                push(T event)
339                {
340                        if (_filter.passEvent(event))
341                        {
342                                _queue.add(event);
343                        }
344                }
345                public T[]        pull(Class<T> clazz)
346                {
347                        return extract(clazz, _queue.take());
348                }
349                public T[]        pull(Class<T> clazz, long timeout)
350                {
351                        return extract(clazz, _queue.take(timeout));
352                }
353                @SuppressWarnings("unchecked")
354                private T[] extract(Class<T> clazz, List<T> events)
355                {
356                        return events.toArray((T[]) Array.newInstance(clazz, events.size()));
357                }
358 
359                private final Queue<T>        _queue = new QueueImpl<T>();
360        }
361 
362        static private class PriorityComparator<T> implements Comparator<ConsumerInfo<T>>
363        {
364                public int        compare(ConsumerInfo<T> c1, ConsumerInfo<T> c2)
365                {
366                        int diff = c1.getPriority() - c2.getPriority();
367                        if (diff != 0)
368                        {
369                                return diff;
370                        }
371                        return c1.getId() - c2.getId();
372                }
373        }
374        
375        private final String                                                _name;
376        private final int                                                        _pullConsumerPriority;
377        private final Class<T>                                                _clazz;
378        private final boolean                                                _logEvents;
379        private int                                                                        _lastId = 0;
380 
381        // List of all consumers sorted by priority (used by push())
382        private final List<ConsumerInfo<T>>                        _priorityConsumers = 
383                                                                                                        new ArrayList<ConsumerInfo<T>>();
384        private final Comparator<ConsumerInfo<T>>        _compare = new PriorityComparator<T>();
385        // Map id -> ConsumerInfo
386        private final Map<Integer, ConsumerInfo<T>>        _consumers = 
387                                                                                                        new HashMap<Integer, ConsumerInfo<T>>();
388        private final ReferenceQueue<Consumer<T>>        _queue = 
389                                                                                                        new ReferenceQueue<Consumer<T>>();
390        
391        private int                                                                        _blocked = 0;
392        private List<T>                                                                _blockedEvents = new ArrayList<T>();
393}

[all classes][net.sourceforge.hiveevents]
EMMA 2.0.5312 (C) Vladimir Roubtsov