001/* 002 * Syncany, www.syncany.org 003 * Copyright (C) 2011-2016 Philipp C. Heckel <philipp.heckel@gmail.com> 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package org.syncany.operations.watch; 019 020import java.io.BufferedReader; 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.OutputStream; 024import java.net.Socket; 025import java.net.SocketTimeoutException; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.Queue; 029import java.util.Set; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034import org.syncany.util.StringUtil; 035 036/** 037 * The notification listener implements a client to the fanout, as very 038 * lightweight pub/sub server originally written for SparkleShare. 039 * 040 * <p>Fanout implements a simple TCP-based plaintext protocol.<br> 041 * It implements the following <b>commands</b>: 042 * <ul> 043 * <li><code>subcribe <channel></code></li> 044 * <li><code>unsubscribe <channel></code></li> 045 * <li><code>announce <channel> <message></code></li> 046 * </ul> 047 * 048 * <p><b>Notifications</b> have the following format: 049 * <ul> 050 * <li><code><channel>!<message></code></li> 051 * </ul> 052 * 053 * <p>The notification listener starts a thread and listens for incoming messages. 054 * Outgoing messages (subscribe/unsubscribe/announce) are sent directly or (if that 055 * fails), put in an outgoing queue. Incoming messages are handed over to a 056 * {@link NotificationListenerListener}. 057 * 058 * @see <a href="https://github.com/travisghansen/fanout/">https://github.com/travisghansen/fanout/</a> - Fanout source code by Travis G. Hansen 059 * @author Philipp C. Heckel (philipp.heckel@gmail.com) 060 */ 061public class NotificationListener { 062 private static final Logger logger = Logger.getLogger(NotificationListener.class.getSimpleName()); 063 private static final int SOCKET_TIMEOUT = 10000; 064 private static final int RECONNECT_WAIT_TIME = 5000; 065 066 private String host; 067 private int port; 068 private NotificationListenerListener listener; 069 070 private AtomicBoolean connected; 071 private AtomicBoolean running; 072 073 private Socket socket; 074 private OutputStream socketOut; 075 private BufferedReader socketIn; 076 077 private Set<String> subscribedChannels; 078 private Queue<String> outgoingMessageQueue; 079 private Thread incomingMessageThread; 080 081 public NotificationListener(String host, int port, NotificationListenerListener listener) { 082 this.host = host; 083 this.port = port; 084 this.listener = listener; 085 086 this.subscribedChannels = new HashSet<String>(); 087 this.incomingMessageThread = null; 088 this.outgoingMessageQueue = new LinkedList<String>(); 089 090 this.connected = new AtomicBoolean(false); 091 this.running = new AtomicBoolean(false); 092 } 093 094 public void start() { 095 logger.log(Level.INFO, "Starting notification listener thread ..."); 096 097 stop(); 098 099 incomingMessageThread = new SocketThread(); 100 incomingMessageThread.start(); 101 } 102 103 public void stop() { 104 if (incomingMessageThread != null) { 105 logger.log(Level.INFO, "Stopping notification listener thread ..."); 106 107 try { 108 running.set(false); 109 110 if (socket != null) { 111 socket.close(); 112 } 113 114 if (incomingMessageThread != null) { 115 incomingMessageThread.interrupt(); 116 } 117 } 118 catch (IOException e) { 119 logger.log(Level.FINE, "Could not close the socket", e); 120 } 121 finally { 122 incomingMessageThread = null; 123 } 124 } 125 } 126 127 public void subscribe(String channel) { 128 subscribedChannels.add(channel); 129 130 logger.log(Level.INFO, "Subscribing to channel " + channel + "..."); 131 sendMessageOrAddToOutgoingQueue(String.format("subscribe %s\n", channel)); 132 } 133 134 public void unsubscribe(String channel) { 135 subscribedChannels.remove(channel); 136 137 logger.log(Level.INFO, "Unsubscribing from channel " + channel + "..."); 138 sendMessageOrAddToOutgoingQueue(String.format("unsubscribe %s\n", channel)); 139 } 140 141 public void announce(String channel, String message) { 142 logger.log(Level.INFO, "Announcing to channel " + channel + ": " + message.trim()); 143 sendMessageOrAddToOutgoingQueue(String.format("announce %s %s\n", channel, message)); 144 } 145 146 private void sendMessageOrAddToOutgoingQueue(String message) { 147 if (connected.get()) { 148 try { 149 socketOut.write(StringUtil.toBytesUTF8(message)); 150 logger.log(Level.INFO, "Sent message: " + message.trim()); 151 } 152 catch (IOException e) { 153 logger.log(Level.FINE, "Could write to the socket", e); 154 queueOutgoingMessage(message); 155 } 156 } 157 else { 158 queueOutgoingMessage(message); 159 } 160 } 161 162 private void queueOutgoingMessage(String message) { 163 if (!outgoingMessageQueue.contains(message)) { 164 logger.log(Level.INFO, "Sending failed or no connection, queuing message: " + message.trim()); 165 outgoingMessageQueue.offer(message); 166 } 167 else { 168 logger.log(Level.INFO, "Sending failed and message already in queue: " + message.trim()); 169 } 170 } 171 172 private void connect() { 173 try { 174 logger.log(Level.INFO, "Connecting socket to " + host + ":" + port + " ..."); 175 176 socket = new Socket(host, port); 177 socket.setSoTimeout(SOCKET_TIMEOUT); 178 179 socketOut = socket.getOutputStream(); 180 socketIn = new BufferedReader(new InputStreamReader(socket.getInputStream())); 181 182 connected.set(socket.isConnected()); 183 } 184 catch (IOException e) { 185 logger.log(Level.FINE, "Could not connect the socket", e); 186 disconnect(); 187 } 188 } 189 190 private void disconnect() { 191 try { 192 logger.log(Level.INFO, "Disconnecting socket ..."); 193 194 if (socket != null) { 195 socket.close(); 196 } 197 198 if (socketOut != null) { 199 socketOut.close(); 200 } 201 202 if (socketIn != null) { 203 socketIn.close(); 204 } 205 } 206 catch (IOException e) { 207 logger.log(Level.FINE, "Could not close the socket", e); 208 } 209 finally { 210 socket = null; 211 socketIn = null; 212 socketOut = null; 213 214 connected.set(false); 215 } 216 } 217 218 private class SocketThread extends Thread { 219 public SocketThread() { 220 super("NotifyThread"); 221 } 222 223 @Override 224 public void run() { 225 running.set(true); 226 connect(); 227 228 while (running.get()) { 229 try { 230 if (socket == null || socketIn == null) { 231 throw new Exception("Socket closed"); 232 } 233 234 if (outgoingMessageQueue.size() > 0) { 235 logger.log(Level.INFO, "Processing queued outgoing messages ..."); 236 processOutgoingMessages(); 237 } 238 239 logger.log(Level.INFO, "Waiting for incoming message (" + SOCKET_TIMEOUT + " ms) ..."); 240 processIncomingMessage(socketIn.readLine()); 241 } 242 catch (SocketTimeoutException e) { 243 // Nothing. Do not log the exception either. 244 //logger.log(Level.FINE, "Socket timed out", e); 245 } 246 catch (InterruptedException e) { 247 logger.log(Level.INFO, "Notification listener interrupted.", e); 248 running.set(false); 249 } 250 catch (Exception e) { 251 try { 252 logger.log(Level.INFO, "Notification connection down: " + e.getMessage() + ", sleeping " + RECONNECT_WAIT_TIME 253 + "ms, then trying a re-connect ..."); 254 255 Thread.sleep(RECONNECT_WAIT_TIME); 256 connect(); 257 258 if (subscribedChannels.size() > 0) { 259 logger.log(Level.INFO, "Re-subscribing to channels after broken connection ..."); 260 261 for (String channel : subscribedChannels) { 262 subscribe(channel); 263 } 264 } 265 } 266 catch (InterruptedException e2) { 267 logger.log(Level.INFO, "Notification listener interrupted.", e2); 268 running.set(false); 269 } 270 } 271 } 272 273 logger.log(Level.INFO, "STOPPED notification listener!"); 274 } 275 276 private void processOutgoingMessages() throws IOException { 277 String nextMessage = null; 278 279 while (null != (nextMessage = outgoingMessageQueue.poll())) { 280 socketOut.write(StringUtil.toBytesUTF8(nextMessage)); 281 logger.log(Level.INFO, "- Sent queued message " + nextMessage); 282 } 283 } 284 285 private void processIncomingMessage(String messageLine) { 286 String[] messageParts = messageLine.split("!"); 287 288 if (messageParts.length == 2) { 289 String channel = messageParts[0]; 290 String message = messageParts[1]; 291 292 if (!"debug".equals(channel)) { 293 logger.log(Level.INFO, "Received message for channel " + channel + ": " + message); 294 listener.pushNotificationReceived(channel, message); 295 } 296 } 297 } 298 } 299 300 public interface NotificationListenerListener { 301 public void pushNotificationReceived(String channel, String message); 302 } 303}