001/*
002 * Syncany, www.syncany.org
003 * Copyright (C) 2011-2016 Philipp C. Heckel <philipp.heckel@gmail.com>
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU General Public License for more details.
014 *
015 * You should have received a copy of the GNU General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package org.syncany.operations.watch;
019
020import java.nio.file.Path;
021import java.nio.file.Paths;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Random;
025import java.util.Timer;
026import java.util.TimerTask;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.logging.Level;
030import java.util.logging.Logger;
031
032import org.syncany.config.Config;
033import org.syncany.config.LocalEventBus;
034import org.syncany.database.SqlDatabase;
035import org.syncany.operations.Operation;
036import org.syncany.operations.cleanup.CleanupOperation;
037import org.syncany.operations.cleanup.CleanupOperationResult;
038import org.syncany.operations.cleanup.CleanupOperationResult.CleanupResultCode;
039import org.syncany.operations.daemon.messages.WatchEndSyncExternalEvent;
040import org.syncany.operations.daemon.messages.WatchStartSyncExternalEvent;
041import org.syncany.operations.down.DownOperation;
042import org.syncany.operations.down.DownOperationResult;
043import org.syncany.operations.down.DownOperationResult.DownResultCode;
044import org.syncany.operations.up.UpOperation;
045import org.syncany.operations.up.UpOperationResult;
046import org.syncany.operations.up.UpOperationResult.UpResultCode;
047import org.syncany.operations.watch.NotificationListener.NotificationListenerListener;
048import org.syncany.operations.watch.RecursiveWatcher.WatchListener;
049import org.syncany.util.StringUtil;
050
051/**
052 * The watch operation implements the constant synchronization known from other
053 * sync tools.
054 *
055 * <p>In order to sync instantly, it offers the following strategies:
056 * <ul>
057 *  <li>It monitors the local file system using the {@link DefaultRecursiveWatcher}.
058 *      Whenever a file or folder changes, the sync is started (after a short
059 *      settlement wait period).</li>
060 *  <li>It subscribes to a repo-specific channel on the Syncany pub/sub server,
061 *      using the {@link NotificationListener}, and publishes updates to this
062 *      channel.</li>
063 *  <li>It periodically runs the sync, i.e. the {@link DownOperation} and
064 *      subsequently the {@link UpOperation}. If the other two mechanisms are
065 *      disabled or fail to register changes, this method will make sure that
066 *      changes are synced eventually.</li>
067 * </ul>
068 *
069 * As of now, this operation never returns, because it runs in a loop. The user
070 * has to manually abort the operation on the command line.
071 *
072 * @author Philipp C. Heckel (philipp.heckel@gmail.com)
073 */
074public class WatchOperation extends Operation implements NotificationListenerListener, WatchListener {
075        private static final Logger logger = Logger.getLogger(WatchOperation.class.getSimpleName());
076        private static final int STOP_GRACE_PERIOD = 15 * 1000;
077
078        private WatchOperationOptions options;
079
080        private SqlDatabase localDatabase;
081
082        private Thread watchThread;
083        private AtomicBoolean syncRunning;
084        private AtomicBoolean syncRequested;
085        private AtomicBoolean stopRequested;
086        private AtomicBoolean pauseRequested;
087        private AtomicInteger upCount;
088
089        private RecursiveWatcher recursiveWatcher;
090        private NotificationListener notificationListener;
091        private LocalEventBus eventBus;
092
093        private String notificationChannel;
094        private String notificationInstanceId;
095
096        public WatchOperation(Config config, WatchOperationOptions options) {
097                super(config);
098
099                this.options = options;
100
101                this.localDatabase = new SqlDatabase(config);
102
103                this.watchThread = null;
104                this.syncRunning = new AtomicBoolean(false);
105                this.syncRequested = new AtomicBoolean(false);
106                this.stopRequested = new AtomicBoolean(false);
107                this.pauseRequested = new AtomicBoolean(false);
108                this.upCount = new AtomicInteger(0);
109
110                this.recursiveWatcher = null;
111                this.notificationListener = null;
112                this.eventBus = LocalEventBus.getInstance();
113
114                this.notificationChannel = StringUtil.toHex(config.getRepoId());
115                this.notificationInstanceId = "" + Math.abs(new Random().nextLong());
116        }
117
118        @Override
119        public WatchOperationResult execute() {
120                watchThread = Thread.currentThread();
121
122                if (options.announcementsEnabled()) {
123                        startNotificationListener();
124                }
125
126                if (options.watcherEnabled()) {
127                        startRecursiveWatcher();
128                }
129
130                syncLoop: while (!stopRequested.get()) {
131                        try {
132                                waitWhilePaused();
133                        }
134                        catch (InterruptedException e) {
135                                logger.log(Level.INFO, "Sleep INTERRUPTED during PAUSE. STOPPING.", e);
136                                break syncLoop;
137                        }
138
139                        try {
140                                runSync();
141
142                                if (!syncRequested.get() && !pauseRequested.get() && !stopRequested.get()) {
143                                        logger.log(Level.INFO, "Sync done, waiting {0} seconds ...", options.getInterval() / 1000);
144                                        Thread.sleep(options.getInterval());
145                                }
146                        }
147                        catch (InterruptedException e) {
148                                logger.log(Level.INFO, "Sync loop INTERRUPTED. STOPPING.", e);
149                                break syncLoop;
150                        }
151                        catch (Exception e) {
152                                if (pauseRequested.get()) {
153                                        logger.log(Level.INFO,
154                                                        "Sync FAILED, but PAUSE requested. Normally we would wait a bit and try again, but in this case we don't.", e);
155                                }
156                                else if (stopRequested.get()) {
157                                        logger.log(Level.INFO, "Sync FAILED, but STOP requested.", e);
158                                        break syncLoop;
159                                }
160                                else {
161                                        logger.log(Level.INFO, String.format("Sync FAILED, waiting %d seconds ...", options.getInterval() / 1000), e);
162
163                                        try {
164                                                Thread.sleep(options.getInterval());
165                                        }
166                                        catch (InterruptedException e2) {
167                                                logger.log(Level.INFO, "Sleep INTERRUPTED during retry-wait. STOPPING.", e2);
168                                                break syncLoop;
169                                        }
170                                }
171                        }
172                }
173
174                if (options.announcementsEnabled()) {
175                        stopNotificationListener();
176                }
177
178                if (options.watcherEnabled()) {
179                        stopRecursiveWatcher();
180                }
181
182                localDatabase.shutdown();
183
184                return new WatchOperationResult();
185        }
186
187        private void startRecursiveWatcher() {
188                logger.log(Level.INFO, "Starting recursive watcher for " + config.getLocalDir() + " ...");
189
190                Path localDir = Paths.get(config.getLocalDir().getAbsolutePath());
191                List<Path> ignorePaths = new ArrayList<Path>();
192
193                ignorePaths.add(Paths.get(config.getAppDir().getAbsolutePath()));
194                ignorePaths.add(Paths.get(config.getCacheDir().getAbsolutePath()));
195                ignorePaths.add(Paths.get(config.getDatabaseDir().getAbsolutePath()));
196                ignorePaths.add(Paths.get(config.getLogDir().getAbsolutePath()));
197
198                recursiveWatcher = RecursiveWatcher.createRecursiveWatcher(localDir, ignorePaths, options.getSettleDelay(), this);
199
200                try {
201                        recursiveWatcher.start();
202                }
203                catch (Exception e) {
204                        logger.log(Level.WARNING, "Cannot initiate file watcher. Relying on regular tree walks.", e);
205                }
206        }
207
208        private void startNotificationListener() {
209                logger.log(Level.INFO, "Starting notification listener for " + config.getLocalDir() + " ...");
210
211                notificationListener = new NotificationListener(options.getAnnouncementsHost(), options.getAnnouncementsPort(), this);
212                notificationListener.start();
213
214                notificationListener.subscribe(notificationChannel);
215        }
216
217        private void stopRecursiveWatcher() {
218                try {
219                        logger.log(Level.INFO, "Stopping recursive watcher for " + config.getLocalDir() + " ...");
220                        recursiveWatcher.stop();
221                }
222                catch (Exception e) {
223                        logger.log(Level.WARNING, "Cannot stop file watcher.", e);
224                }
225        }
226
227        private void stopNotificationListener() {
228                logger.log(Level.INFO, "Stopping notification listener for " + config.getLocalDir() + " ...");
229                notificationListener.stop();
230        }
231
232        /**
233         * Runs one iteration of the main synchronization loop, containing a {@link DownOperation},
234         * an {@link UpOperation} and (if required), a {@link CleanupOperation}.
235         */
236        private void runSync() throws Exception {
237                if (!syncRunning.get()) {
238                        syncRunning.set(true);
239                        syncRequested.set(false);
240
241                        logger.log(Level.INFO, "RUNNING SYNC ...");
242                        fireStartEvent();
243                        try {
244                                boolean notifyChanges = false;
245
246                                // Run down
247                                DownOperationResult downResult = new DownOperation(config, options.getDownOptions()).execute();
248
249                                if (downResult.getResultCode() == DownResultCode.OK_WITH_REMOTE_CHANGES) {
250                                        // TODO [low] Do something?
251                                }
252
253                                // Run up
254                                UpOperationResult upOperationResult = new UpOperation(config, options.getUpOptions()).execute();
255
256                                if (upOperationResult.getResultCode() == UpResultCode.OK_CHANGES_UPLOADED && upOperationResult.getChangeSet().hasChanges()) {
257                                        upCount.incrementAndGet();
258                                        notifyChanges = true;
259                                }
260
261                                CleanupOperationResult cleanupOperationResult = new CleanupOperation(config, options.getCleanupOptions()).execute();
262
263                                if (cleanupOperationResult.getResultCode() == CleanupResultCode.OK) {
264                                        notifyChanges = true;
265                                }
266
267                                // Fire change event if up and/or cleanup
268                                if (notifyChanges) {
269                                        notifyChanges();
270                                }
271                        }
272                        finally {
273                                logger.log(Level.INFO, "SYNC DONE.");
274                                syncRunning.set(false);
275                                
276                                fireEndEvent();
277                        }
278                }
279                else {
280                        // Can't do a log message here, because this bit is called thousand
281                        // of times when file system events occur.
282
283                        syncRequested.set(true);
284                }
285        }
286
287        @Override
288        public void pushNotificationReceived(String channel, String message) {
289                if (channel.equals(notificationChannel) && !message.equals(notificationInstanceId)) {
290                        try {
291                                waitWhilePaused();
292                                runSync();
293                        }
294                        catch (Exception e) {
295                                logger.log(Level.INFO, "Sync FAILED (event-triggered).", e);
296                        }
297                }
298        }
299
300        @Override
301        public void watchEventsOccurred() {
302                try {
303                        waitWhilePaused();
304                        runSync();
305                }
306                catch (Exception e) {
307                        logger.log(Level.INFO, "Sync FAILED (event-triggered).", e);
308                }
309        }
310
311        private void notifyChanges() {
312                if (notificationListener != null) {
313                        notificationListener.announce(notificationChannel, notificationInstanceId);
314                }
315        }
316
317        public void pause() {
318                pauseRequested.set(true);
319        }
320
321        public void resume() {
322                pauseRequested.set(false);
323        }
324
325        public void stop() {
326                if (!stopRequested.get()) {
327                        stopRequested.set(true);
328
329                        if (syncRunning.get()) {
330                                logger.log(Level.INFO, "Stop requested, but sync process currently running. Waiting max. " + STOP_GRACE_PERIOD
331                                                + "ms for sync to finish.");
332                                scheduleForceKill();
333                        }
334                        else {
335                                logger.log(Level.INFO, "Stop requested, but sync is NOT running. Immediately stopping thread.");
336                                forceKillWatchThread();
337                        }
338                }
339                else {
340                        logger.log(Level.INFO, "Stop requested AGAIN, but was requested before. IGNORING.");
341                }
342        }
343
344        public boolean isSyncRunning() {
345                return syncRunning.get();
346        }
347
348        public boolean isSyncRequested() {
349                return syncRequested.get();
350        }
351
352        private void waitWhilePaused() throws InterruptedException {
353                while (pauseRequested.get()) {
354                        Thread.sleep(1000);
355                }
356        }
357
358        private void scheduleForceKill() {
359                String killTimerName = "Kill/" + config.getLocalDir().getName();
360
361                new Timer(killTimerName).schedule(new TimerTask() {
362                        @Override
363                        public void run() {
364                                try {
365                                        logger.log(Level.INFO, "STOP GRACE PERIOD OVER. STOPPING WATCH " + config.getLocalDir() + " ...");
366
367                                        if (watchThread != null && !watchThread.isInterrupted()) {
368                                                watchThread.interrupt();
369                                        }
370                                }
371                                catch (Exception e) {
372                                        logger.log(Level.INFO, "Forcefully stopping watch thread FAILED at " + config.getLocalDir() + ". Giving up.");
373                                }
374                        }
375                }, STOP_GRACE_PERIOD);
376        }
377
378        private void forceKillWatchThread() {
379                try {
380                        logger.log(Level.INFO, "STOPPING WATCH " + config.getLocalDir() + " ...");
381
382                        if (watchThread != null && !watchThread.isInterrupted()) {
383                                watchThread.interrupt();
384                        }
385                }
386                catch (Exception e) {
387                        logger.log(Level.INFO, "Forcefully stopping watch thread FAILED at " + config.getLocalDir() + ". Giving up.", e);
388                }
389        }
390
391        private void fireStartEvent() {
392                eventBus.post(new WatchStartSyncExternalEvent(config.getLocalDir().getAbsolutePath())); 
393        }
394
395
396        private void fireEndEvent() {
397                eventBus.post(new WatchEndSyncExternalEvent(config.getLocalDir().getAbsolutePath()));   
398        }
399}