001/**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.fusesource.hawtdispatch.transport;
019
020import org.fusesource.hawtdispatch.Dispatch;
021import org.fusesource.hawtdispatch.Task;
022
023import java.util.concurrent.TimeUnit;
024
025/**
026 * <p>A HeartBeatMonitor can be used to watch the read and write
027 * activity of a transport and raise events when the write side
028 * or read side has been idle too long.</p>
029 *
030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031 */
032public class HeartBeatMonitor {
033
034    Transport transport;
035    long initialWriteCheckDelay;
036    long initialReadCheckDelay;
037    long writeInterval;
038    long readInterval;
039
040    Task onKeepAlive = Dispatch.NOOP;
041    Task onDead = Dispatch.NOOP;
042
043    volatile short session = 0;
044
045    boolean readSuspendedInterval;
046    short readSuspendCount;
047
048    Object lock = new Object();
049
050    public void suspendRead() {
051        readSuspendCount++;
052        readSuspendedInterval = true;
053    }
054
055    public void resumeRead() {
056        readSuspendCount--;
057    }
058
059    private void schedule(final short session, long interval, final Task func) {
060        if (this.session == session) {
061            transport.getDispatchQueue().executeAfter(interval, TimeUnit.MILLISECONDS, new Task() {
062                public void run() {
063                    synchronized (lock) {
064                        if (HeartBeatMonitor.this.session == session) {
065                            func.run();
066                        }
067                    }
068                }
069            });
070        }
071    }
072
073    private void scheduleCheckWrites(final short session) {
074        final ProtocolCodec codec = transport.getProtocolCodec();
075        Task func;
076        if (codec == null) {
077            func = new Task() {
078                public void run() {
079                    scheduleCheckWrites(session);
080                }
081            };
082        } else {
083            final long lastWriteCounter = codec.getWriteCounter();
084            func = new Task() {
085                public void run() {
086                    if (lastWriteCounter == codec.getWriteCounter()) {
087                        onKeepAlive.run();
088                    }
089                    scheduleCheckWrites(session);
090                }
091            };
092        }
093        schedule(session, writeInterval, func);
094    }
095
096    private void scheduleCheckReads(final short session) {
097        final ProtocolCodec codec = transport.getProtocolCodec();
098        Task func;
099        if (codec == null) {
100            func = new Task() {
101                public void run() {
102                    scheduleCheckReads(session);
103                }
104            };
105        } else {
106            final long lastReadCounter = codec.getReadCounter();
107            func = new Task() {
108                public void run() {
109                    if (lastReadCounter == codec.getReadCounter() && !readSuspendedInterval && readSuspendCount == 0) {
110                        onDead.run();
111                    }
112                    readSuspendedInterval = false;
113                    scheduleCheckReads(session);
114                }
115            };
116        }
117        schedule(session, readInterval, func);
118    }
119
120    public void start() {
121        session++;
122        readSuspendedInterval = false;
123        if (writeInterval != 0) {
124            if (initialWriteCheckDelay != 0) {
125                transport.getDispatchQueue().executeAfter(initialWriteCheckDelay, TimeUnit.MILLISECONDS, new Task() {
126                    public void run() {
127                        scheduleCheckWrites(session);
128                    }
129                });
130            } else {
131                scheduleCheckWrites(session);
132            }
133        }
134        if (readInterval != 0) {
135            if (initialReadCheckDelay != 0) {
136                transport.getDispatchQueue().executeAfter(initialReadCheckDelay, TimeUnit.MILLISECONDS, new Task() {
137                    public void run() {
138                        scheduleCheckReads(session);
139                    }
140                });
141            } else {
142                scheduleCheckReads(session);
143            }
144        }
145    }
146
147    public void stop() {
148        synchronized (lock) {
149            session++;
150        }
151    }
152
153
154    public long getInitialReadCheckDelay() {
155        return initialReadCheckDelay;
156    }
157
158    public void setInitialReadCheckDelay(long initialReadCheckDelay) {
159        this.initialReadCheckDelay = initialReadCheckDelay;
160    }
161
162    public long getInitialWriteCheckDelay() {
163        return initialWriteCheckDelay;
164    }
165
166    public void setInitialWriteCheckDelay(long initialWriteCheckDelay) {
167        this.initialWriteCheckDelay = initialWriteCheckDelay;
168    }
169
170    public Task getOnDead() {
171        return onDead;
172    }
173
174    public void setOnDead(Task onDead) {
175        this.onDead = onDead;
176    }
177
178    public Task getOnKeepAlive() {
179        return onKeepAlive;
180    }
181
182    public void setOnKeepAlive(Task onKeepAlive) {
183        this.onKeepAlive = onKeepAlive;
184    }
185
186    public long getWriteInterval() {
187        return writeInterval;
188    }
189
190    public void setWriteInterval(long writeInterval) {
191        this.writeInterval = writeInterval;
192    }
193
194    public Transport getTransport() {
195        return transport;
196    }
197
198    public void setTransport(Transport transport) {
199        this.transport = transport;
200    }
201
202    public long getReadInterval() {
203        return readInterval;
204    }
205
206    public void setReadInterval(long readInterval) {
207        this.readInterval = readInterval;
208    }
209}