001/** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.fusesource.hawtdispatch.transport; 019 020import org.fusesource.hawtdispatch.Dispatch; 021import org.fusesource.hawtdispatch.Task; 022 023import java.util.concurrent.TimeUnit; 024 025/** 026 * <p>A HeartBeatMonitor can be used to watch the read and write 027 * activity of a transport and raise events when the write side 028 * or read side has been idle too long.</p> 029 * 030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 031 */ 032public class HeartBeatMonitor { 033 034 Transport transport; 035 long initialWriteCheckDelay; 036 long initialReadCheckDelay; 037 long writeInterval; 038 long readInterval; 039 040 Task onKeepAlive = Dispatch.NOOP; 041 Task onDead = Dispatch.NOOP; 042 043 volatile short session = 0; 044 045 boolean readSuspendedInterval; 046 short readSuspendCount; 047 048 Object lock = new Object(); 049 050 public void suspendRead() { 051 readSuspendCount++; 052 readSuspendedInterval = true; 053 } 054 055 public void resumeRead() { 056 readSuspendCount--; 057 } 058 059 private void schedule(final short session, long interval, final Task func) { 060 if (this.session == session) { 061 transport.getDispatchQueue().executeAfter(interval, TimeUnit.MILLISECONDS, new Task() { 062 public void run() { 063 synchronized (lock) { 064 if (HeartBeatMonitor.this.session == session) { 065 func.run(); 066 } 067 } 068 } 069 }); 070 } 071 } 072 073 private void scheduleCheckWrites(final short session) { 074 final ProtocolCodec codec = transport.getProtocolCodec(); 075 Task func; 076 if (codec == null) { 077 func = new Task() { 078 public void run() { 079 scheduleCheckWrites(session); 080 } 081 }; 082 } else { 083 final long lastWriteCounter = codec.getWriteCounter(); 084 func = new Task() { 085 public void run() { 086 if (lastWriteCounter == codec.getWriteCounter()) { 087 onKeepAlive.run(); 088 } 089 scheduleCheckWrites(session); 090 } 091 }; 092 } 093 schedule(session, writeInterval, func); 094 } 095 096 private void scheduleCheckReads(final short session) { 097 final ProtocolCodec codec = transport.getProtocolCodec(); 098 Task func; 099 if (codec == null) { 100 func = new Task() { 101 public void run() { 102 scheduleCheckReads(session); 103 } 104 }; 105 } else { 106 final long lastReadCounter = codec.getReadCounter(); 107 func = new Task() { 108 public void run() { 109 if (lastReadCounter == codec.getReadCounter() && !readSuspendedInterval && readSuspendCount == 0) { 110 onDead.run(); 111 } 112 readSuspendedInterval = false; 113 scheduleCheckReads(session); 114 } 115 }; 116 } 117 schedule(session, readInterval, func); 118 } 119 120 public void start() { 121 session++; 122 readSuspendedInterval = false; 123 if (writeInterval != 0) { 124 if (initialWriteCheckDelay != 0) { 125 transport.getDispatchQueue().executeAfter(initialWriteCheckDelay, TimeUnit.MILLISECONDS, new Task() { 126 public void run() { 127 scheduleCheckWrites(session); 128 } 129 }); 130 } else { 131 scheduleCheckWrites(session); 132 } 133 } 134 if (readInterval != 0) { 135 if (initialReadCheckDelay != 0) { 136 transport.getDispatchQueue().executeAfter(initialReadCheckDelay, TimeUnit.MILLISECONDS, new Task() { 137 public void run() { 138 scheduleCheckReads(session); 139 } 140 }); 141 } else { 142 scheduleCheckReads(session); 143 } 144 } 145 } 146 147 public void stop() { 148 synchronized (lock) { 149 session++; 150 } 151 } 152 153 154 public long getInitialReadCheckDelay() { 155 return initialReadCheckDelay; 156 } 157 158 public void setInitialReadCheckDelay(long initialReadCheckDelay) { 159 this.initialReadCheckDelay = initialReadCheckDelay; 160 } 161 162 public long getInitialWriteCheckDelay() { 163 return initialWriteCheckDelay; 164 } 165 166 public void setInitialWriteCheckDelay(long initialWriteCheckDelay) { 167 this.initialWriteCheckDelay = initialWriteCheckDelay; 168 } 169 170 public Task getOnDead() { 171 return onDead; 172 } 173 174 public void setOnDead(Task onDead) { 175 this.onDead = onDead; 176 } 177 178 public Task getOnKeepAlive() { 179 return onKeepAlive; 180 } 181 182 public void setOnKeepAlive(Task onKeepAlive) { 183 this.onKeepAlive = onKeepAlive; 184 } 185 186 public long getWriteInterval() { 187 return writeInterval; 188 } 189 190 public void setWriteInterval(long writeInterval) { 191 this.writeInterval = writeInterval; 192 } 193 194 public Transport getTransport() { 195 return transport; 196 } 197 198 public void setTransport(Transport transport) { 199 this.transport = transport; 200 } 201 202 public long getReadInterval() { 203 return readInterval; 204 } 205 206 public void setReadInterval(long readInterval) { 207 this.readInterval = readInterval; 208 } 209}