001/*
002 * Syncany, www.syncany.org
003 * Copyright (C) 2011-2016 Philipp C. Heckel <philipp.heckel@gmail.com>
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU General Public License for more details.
014 *
015 * You should have received a copy of the GNU General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package org.syncany.operations.watch;
019
020import java.io.BufferedReader;
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.OutputStream;
024import java.net.Socket;
025import java.net.SocketTimeoutException;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.Queue;
029import java.util.Set;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034import org.syncany.util.StringUtil;
035
036/**
037 * The notification listener implements a client to the fanout, as very
038 * lightweight pub/sub server originally written for SparkleShare.
039 *
040 * <p>Fanout implements a simple TCP-based plaintext protocol.<br>
041 * It implements the following <b>commands</b>:
042 * <ul>
043 *  <li><code>subcribe &lt;channel&gt;</code></li>
044 *  <li><code>unsubscribe &lt;channel&gt;</code></li>
045 *  <li><code>announce &lt;channel&gt; &lt;message&gt;</code></li>
046 * </ul>
047 *
048 * <p><b>Notifications</b> have the following format:
049 * <ul>
050 *  <li><code>&lt;channel&gt;!&lt;message&gt;</code></li>
051 * </ul>
052 *
053 * <p>The notification listener starts a thread and listens for incoming messages.
054 * Outgoing messages (subscribe/unsubscribe/announce) are sent directly or (if that
055 * fails), put in an outgoing queue. Incoming messages are handed over to a
056 * {@link NotificationListenerListener}.
057 *
058 * @see <a href="https://github.com/travisghansen/fanout/">https://github.com/travisghansen/fanout/</a> - Fanout source code by Travis G. Hansen
059 * @author Philipp C. Heckel (philipp.heckel@gmail.com)
060 */
061public class NotificationListener {
062        private static final Logger logger = Logger.getLogger(NotificationListener.class.getSimpleName());
063        private static final int SOCKET_TIMEOUT = 10000;
064        private static final int RECONNECT_WAIT_TIME = 5000;
065
066        private String host;
067        private int port;
068        private NotificationListenerListener listener;
069
070        private AtomicBoolean connected;
071        private AtomicBoolean running;
072
073        private Socket socket;
074        private OutputStream socketOut;
075        private BufferedReader socketIn;
076
077        private Set<String> subscribedChannels;
078        private Queue<String> outgoingMessageQueue;
079        private Thread incomingMessageThread;
080
081        public NotificationListener(String host, int port, NotificationListenerListener listener) {
082                this.host = host;
083                this.port = port;
084                this.listener = listener;
085
086                this.subscribedChannels = new HashSet<String>();
087                this.incomingMessageThread = null;
088                this.outgoingMessageQueue = new LinkedList<String>();
089
090                this.connected = new AtomicBoolean(false);
091                this.running = new AtomicBoolean(false);
092        }
093
094        public void start() {
095                logger.log(Level.INFO, "Starting notification listener thread ...");
096
097                stop();
098
099                incomingMessageThread = new SocketThread();
100                incomingMessageThread.start();
101        }
102
103        public void stop() {
104                if (incomingMessageThread != null) {
105                        logger.log(Level.INFO, "Stopping notification listener thread ...");
106
107                        try {
108                                running.set(false);
109
110                                if (socket != null) {
111                                        socket.close();
112                                }
113
114                                if (incomingMessageThread != null) {
115                                        incomingMessageThread.interrupt();
116                                }
117                        }
118                        catch (IOException e) {
119                                logger.log(Level.FINE, "Could not close the socket", e);
120                        }
121                        finally {
122                                incomingMessageThread = null;
123                        }
124                }
125        }
126
127        public void subscribe(String channel) {
128                subscribedChannels.add(channel);
129
130                logger.log(Level.INFO, "Subscribing to channel " + channel + "...");
131                sendMessageOrAddToOutgoingQueue(String.format("subscribe %s\n", channel));
132        }
133
134        public void unsubscribe(String channel) {
135                subscribedChannels.remove(channel);
136
137                logger.log(Level.INFO, "Unsubscribing from channel " + channel + "...");
138                sendMessageOrAddToOutgoingQueue(String.format("unsubscribe %s\n", channel));
139        }
140
141        public void announce(String channel, String message) {
142                logger.log(Level.INFO, "Announcing to channel " + channel + ": " + message.trim());
143                sendMessageOrAddToOutgoingQueue(String.format("announce %s %s\n", channel, message));
144        }
145
146        private void sendMessageOrAddToOutgoingQueue(String message) {
147                if (connected.get()) {
148                        try {
149                                socketOut.write(StringUtil.toBytesUTF8(message));
150                                logger.log(Level.INFO, "Sent message: " + message.trim());
151                        }
152                        catch (IOException e) {
153                                logger.log(Level.FINE, "Could write to the socket", e);
154                                queueOutgoingMessage(message);
155                        }
156                }
157                else {
158                        queueOutgoingMessage(message);
159                }
160        }
161
162        private void queueOutgoingMessage(String message) {
163                if (!outgoingMessageQueue.contains(message)) {
164                        logger.log(Level.INFO, "Sending failed or no connection, queuing message: " + message.trim());
165                        outgoingMessageQueue.offer(message);
166                }
167                else {
168                        logger.log(Level.INFO, "Sending failed and message already in queue: " + message.trim());
169                }
170        }
171
172        private void connect() {
173                try {
174                        logger.log(Level.INFO, "Connecting socket to " + host + ":" + port + " ...");
175
176                        socket = new Socket(host, port);
177                        socket.setSoTimeout(SOCKET_TIMEOUT);
178
179                        socketOut = socket.getOutputStream();
180                        socketIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));
181
182                        connected.set(socket.isConnected());
183                }
184                catch (IOException e) {
185                        logger.log(Level.FINE, "Could not connect the socket", e);
186                        disconnect();
187                }
188        }
189
190        private void disconnect() {
191                try {
192                        logger.log(Level.INFO, "Disconnecting socket ...");
193
194                        if (socket != null) {
195                                socket.close();
196                        }
197
198                        if (socketOut != null) {
199                                socketOut.close();
200                        }
201
202                        if (socketIn != null) {
203                                socketIn.close();
204                        }
205                }
206                catch (IOException e) {
207                        logger.log(Level.FINE, "Could not close the socket", e);
208                }
209                finally {
210                        socket = null;
211                        socketIn = null;
212                        socketOut = null;
213
214                        connected.set(false);
215                }
216        }
217
218        private class SocketThread extends Thread {
219                public SocketThread() {
220                        super("NotifyThread");
221                }
222
223                @Override
224                public void run() {
225                        running.set(true);
226                        connect();
227
228                        while (running.get()) {
229                                try {
230                                        if (socket == null || socketIn == null) {
231                                                throw new Exception("Socket closed");
232                                        }
233
234                                        if (outgoingMessageQueue.size() > 0) {
235                                                logger.log(Level.INFO, "Processing queued outgoing messages ...");
236                                                processOutgoingMessages();
237                                        }
238
239                                        logger.log(Level.INFO, "Waiting for incoming message (" + SOCKET_TIMEOUT + " ms) ...");
240                                        processIncomingMessage(socketIn.readLine());
241                                }
242                                catch (SocketTimeoutException e) {
243                                        // Nothing. Do not log the exception either.
244                                        //logger.log(Level.FINE, "Socket timed out", e);
245                                }
246                                catch (InterruptedException e) {
247                                        logger.log(Level.INFO, "Notification listener interrupted.", e);
248                                        running.set(false);
249                                }
250                                catch (Exception e) {
251                                        try {
252                                                logger.log(Level.INFO, "Notification connection down: " + e.getMessage() + ", sleeping " + RECONNECT_WAIT_TIME
253                                                                + "ms, then trying a re-connect ...");
254
255                                                Thread.sleep(RECONNECT_WAIT_TIME);
256                                                connect();
257
258                                                if (subscribedChannels.size() > 0) {
259                                                        logger.log(Level.INFO, "Re-subscribing to channels after broken connection ...");
260
261                                                        for (String channel : subscribedChannels) {
262                                                                subscribe(channel);
263                                                        }
264                                                }
265                                        }
266                                        catch (InterruptedException e2) {
267                                                logger.log(Level.INFO, "Notification listener interrupted.", e2);
268                                                running.set(false);
269                                        }
270                                }
271                        }
272
273                        logger.log(Level.INFO, "STOPPED notification listener!");
274                }
275
276                private void processOutgoingMessages() throws IOException {
277                        String nextMessage = null;
278
279                        while (null != (nextMessage = outgoingMessageQueue.poll())) {
280                                socketOut.write(StringUtil.toBytesUTF8(nextMessage));
281                                logger.log(Level.INFO, "- Sent queued message " + nextMessage);
282                        }
283                }
284
285                private void processIncomingMessage(String messageLine) {
286                        String[] messageParts = messageLine.split("!");
287
288                        if (messageParts.length == 2) {
289                                String channel = messageParts[0];
290                                String message = messageParts[1];
291
292                                if (!"debug".equals(channel)) {
293                                        logger.log(Level.INFO, "Received message for channel " + channel + ": " + message);
294                                        listener.pushNotificationReceived(channel, message);
295                                }
296                        }
297                }
298        }
299
300        public interface NotificationListenerListener {
301                public void pushNotificationReceived(String channel, String message);
302        }
303}