001/* 002 * Syncany, www.syncany.org 003 * Copyright (C) 2011-2016 Philipp C. Heckel <philipp.heckel@gmail.com> 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package org.syncany.operations.watch; 019 020import java.nio.file.Path; 021import java.nio.file.Paths; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Random; 025import java.util.Timer; 026import java.util.TimerTask; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032import org.syncany.config.Config; 033import org.syncany.config.LocalEventBus; 034import org.syncany.database.SqlDatabase; 035import org.syncany.operations.Operation; 036import org.syncany.operations.cleanup.CleanupOperation; 037import org.syncany.operations.cleanup.CleanupOperationResult; 038import org.syncany.operations.cleanup.CleanupOperationResult.CleanupResultCode; 039import org.syncany.operations.daemon.messages.WatchEndSyncExternalEvent; 040import org.syncany.operations.daemon.messages.WatchStartSyncExternalEvent; 041import org.syncany.operations.down.DownOperation; 042import org.syncany.operations.down.DownOperationResult; 043import org.syncany.operations.down.DownOperationResult.DownResultCode; 044import org.syncany.operations.up.UpOperation; 045import org.syncany.operations.up.UpOperationResult; 046import org.syncany.operations.up.UpOperationResult.UpResultCode; 047import org.syncany.operations.watch.NotificationListener.NotificationListenerListener; 048import org.syncany.operations.watch.RecursiveWatcher.WatchListener; 049import org.syncany.util.StringUtil; 050 051/** 052 * The watch operation implements the constant synchronization known from other 053 * sync tools. 054 * 055 * <p>In order to sync instantly, it offers the following strategies: 056 * <ul> 057 * <li>It monitors the local file system using the {@link DefaultRecursiveWatcher}. 058 * Whenever a file or folder changes, the sync is started (after a short 059 * settlement wait period).</li> 060 * <li>It subscribes to a repo-specific channel on the Syncany pub/sub server, 061 * using the {@link NotificationListener}, and publishes updates to this 062 * channel.</li> 063 * <li>It periodically runs the sync, i.e. the {@link DownOperation} and 064 * subsequently the {@link UpOperation}. If the other two mechanisms are 065 * disabled or fail to register changes, this method will make sure that 066 * changes are synced eventually.</li> 067 * </ul> 068 * 069 * As of now, this operation never returns, because it runs in a loop. The user 070 * has to manually abort the operation on the command line. 071 * 072 * @author Philipp C. Heckel (philipp.heckel@gmail.com) 073 */ 074public class WatchOperation extends Operation implements NotificationListenerListener, WatchListener { 075 private static final Logger logger = Logger.getLogger(WatchOperation.class.getSimpleName()); 076 private static final int STOP_GRACE_PERIOD = 15 * 1000; 077 078 private WatchOperationOptions options; 079 080 private SqlDatabase localDatabase; 081 082 private Thread watchThread; 083 private AtomicBoolean syncRunning; 084 private AtomicBoolean syncRequested; 085 private AtomicBoolean stopRequested; 086 private AtomicBoolean pauseRequested; 087 private AtomicInteger upCount; 088 089 private RecursiveWatcher recursiveWatcher; 090 private NotificationListener notificationListener; 091 private LocalEventBus eventBus; 092 093 private String notificationChannel; 094 private String notificationInstanceId; 095 096 public WatchOperation(Config config, WatchOperationOptions options) { 097 super(config); 098 099 this.options = options; 100 101 this.localDatabase = new SqlDatabase(config); 102 103 this.watchThread = null; 104 this.syncRunning = new AtomicBoolean(false); 105 this.syncRequested = new AtomicBoolean(false); 106 this.stopRequested = new AtomicBoolean(false); 107 this.pauseRequested = new AtomicBoolean(false); 108 this.upCount = new AtomicInteger(0); 109 110 this.recursiveWatcher = null; 111 this.notificationListener = null; 112 this.eventBus = LocalEventBus.getInstance(); 113 114 this.notificationChannel = StringUtil.toHex(config.getRepoId()); 115 this.notificationInstanceId = "" + Math.abs(new Random().nextLong()); 116 } 117 118 @Override 119 public WatchOperationResult execute() { 120 watchThread = Thread.currentThread(); 121 122 if (options.announcementsEnabled()) { 123 startNotificationListener(); 124 } 125 126 if (options.watcherEnabled()) { 127 startRecursiveWatcher(); 128 } 129 130 syncLoop: while (!stopRequested.get()) { 131 try { 132 waitWhilePaused(); 133 } 134 catch (InterruptedException e) { 135 logger.log(Level.INFO, "Sleep INTERRUPTED during PAUSE. STOPPING.", e); 136 break syncLoop; 137 } 138 139 try { 140 runSync(); 141 142 if (!syncRequested.get() && !pauseRequested.get() && !stopRequested.get()) { 143 logger.log(Level.INFO, "Sync done, waiting {0} seconds ...", options.getInterval() / 1000); 144 Thread.sleep(options.getInterval()); 145 } 146 } 147 catch (InterruptedException e) { 148 logger.log(Level.INFO, "Sync loop INTERRUPTED. STOPPING.", e); 149 break syncLoop; 150 } 151 catch (Exception e) { 152 if (pauseRequested.get()) { 153 logger.log(Level.INFO, 154 "Sync FAILED, but PAUSE requested. Normally we would wait a bit and try again, but in this case we don't.", e); 155 } 156 else if (stopRequested.get()) { 157 logger.log(Level.INFO, "Sync FAILED, but STOP requested.", e); 158 break syncLoop; 159 } 160 else { 161 logger.log(Level.INFO, String.format("Sync FAILED, waiting %d seconds ...", options.getInterval() / 1000), e); 162 163 try { 164 Thread.sleep(options.getInterval()); 165 } 166 catch (InterruptedException e2) { 167 logger.log(Level.INFO, "Sleep INTERRUPTED during retry-wait. STOPPING.", e2); 168 break syncLoop; 169 } 170 } 171 } 172 } 173 174 if (options.announcementsEnabled()) { 175 stopNotificationListener(); 176 } 177 178 if (options.watcherEnabled()) { 179 stopRecursiveWatcher(); 180 } 181 182 localDatabase.shutdown(); 183 184 return new WatchOperationResult(); 185 } 186 187 private void startRecursiveWatcher() { 188 logger.log(Level.INFO, "Starting recursive watcher for " + config.getLocalDir() + " ..."); 189 190 Path localDir = Paths.get(config.getLocalDir().getAbsolutePath()); 191 List<Path> ignorePaths = new ArrayList<Path>(); 192 193 ignorePaths.add(Paths.get(config.getAppDir().getAbsolutePath())); 194 ignorePaths.add(Paths.get(config.getCacheDir().getAbsolutePath())); 195 ignorePaths.add(Paths.get(config.getDatabaseDir().getAbsolutePath())); 196 ignorePaths.add(Paths.get(config.getLogDir().getAbsolutePath())); 197 198 recursiveWatcher = RecursiveWatcher.createRecursiveWatcher(localDir, ignorePaths, options.getSettleDelay(), this); 199 200 try { 201 recursiveWatcher.start(); 202 } 203 catch (Exception e) { 204 logger.log(Level.WARNING, "Cannot initiate file watcher. Relying on regular tree walks.", e); 205 } 206 } 207 208 private void startNotificationListener() { 209 logger.log(Level.INFO, "Starting notification listener for " + config.getLocalDir() + " ..."); 210 211 notificationListener = new NotificationListener(options.getAnnouncementsHost(), options.getAnnouncementsPort(), this); 212 notificationListener.start(); 213 214 notificationListener.subscribe(notificationChannel); 215 } 216 217 private void stopRecursiveWatcher() { 218 try { 219 logger.log(Level.INFO, "Stopping recursive watcher for " + config.getLocalDir() + " ..."); 220 recursiveWatcher.stop(); 221 } 222 catch (Exception e) { 223 logger.log(Level.WARNING, "Cannot stop file watcher.", e); 224 } 225 } 226 227 private void stopNotificationListener() { 228 logger.log(Level.INFO, "Stopping notification listener for " + config.getLocalDir() + " ..."); 229 notificationListener.stop(); 230 } 231 232 /** 233 * Runs one iteration of the main synchronization loop, containing a {@link DownOperation}, 234 * an {@link UpOperation} and (if required), a {@link CleanupOperation}. 235 */ 236 private void runSync() throws Exception { 237 if (!syncRunning.get()) { 238 syncRunning.set(true); 239 syncRequested.set(false); 240 241 logger.log(Level.INFO, "RUNNING SYNC ..."); 242 fireStartEvent(); 243 try { 244 boolean notifyChanges = false; 245 246 // Run down 247 DownOperationResult downResult = new DownOperation(config, options.getDownOptions()).execute(); 248 249 if (downResult.getResultCode() == DownResultCode.OK_WITH_REMOTE_CHANGES) { 250 // TODO [low] Do something? 251 } 252 253 // Run up 254 UpOperationResult upOperationResult = new UpOperation(config, options.getUpOptions()).execute(); 255 256 if (upOperationResult.getResultCode() == UpResultCode.OK_CHANGES_UPLOADED && upOperationResult.getChangeSet().hasChanges()) { 257 upCount.incrementAndGet(); 258 notifyChanges = true; 259 } 260 261 CleanupOperationResult cleanupOperationResult = new CleanupOperation(config, options.getCleanupOptions()).execute(); 262 263 if (cleanupOperationResult.getResultCode() == CleanupResultCode.OK) { 264 notifyChanges = true; 265 } 266 267 // Fire change event if up and/or cleanup 268 if (notifyChanges) { 269 notifyChanges(); 270 } 271 } 272 finally { 273 logger.log(Level.INFO, "SYNC DONE."); 274 syncRunning.set(false); 275 276 fireEndEvent(); 277 } 278 } 279 else { 280 // Can't do a log message here, because this bit is called thousand 281 // of times when file system events occur. 282 283 syncRequested.set(true); 284 } 285 } 286 287 @Override 288 public void pushNotificationReceived(String channel, String message) { 289 if (channel.equals(notificationChannel) && !message.equals(notificationInstanceId)) { 290 try { 291 waitWhilePaused(); 292 runSync(); 293 } 294 catch (Exception e) { 295 logger.log(Level.INFO, "Sync FAILED (event-triggered).", e); 296 } 297 } 298 } 299 300 @Override 301 public void watchEventsOccurred() { 302 try { 303 waitWhilePaused(); 304 runSync(); 305 } 306 catch (Exception e) { 307 logger.log(Level.INFO, "Sync FAILED (event-triggered).", e); 308 } 309 } 310 311 private void notifyChanges() { 312 if (notificationListener != null) { 313 notificationListener.announce(notificationChannel, notificationInstanceId); 314 } 315 } 316 317 public void pause() { 318 pauseRequested.set(true); 319 } 320 321 public void resume() { 322 pauseRequested.set(false); 323 } 324 325 public void stop() { 326 if (!stopRequested.get()) { 327 stopRequested.set(true); 328 329 if (syncRunning.get()) { 330 logger.log(Level.INFO, "Stop requested, but sync process currently running. Waiting max. " + STOP_GRACE_PERIOD 331 + "ms for sync to finish."); 332 scheduleForceKill(); 333 } 334 else { 335 logger.log(Level.INFO, "Stop requested, but sync is NOT running. Immediately stopping thread."); 336 forceKillWatchThread(); 337 } 338 } 339 else { 340 logger.log(Level.INFO, "Stop requested AGAIN, but was requested before. IGNORING."); 341 } 342 } 343 344 public boolean isSyncRunning() { 345 return syncRunning.get(); 346 } 347 348 public boolean isSyncRequested() { 349 return syncRequested.get(); 350 } 351 352 private void waitWhilePaused() throws InterruptedException { 353 while (pauseRequested.get()) { 354 Thread.sleep(1000); 355 } 356 } 357 358 private void scheduleForceKill() { 359 String killTimerName = "Kill/" + config.getLocalDir().getName(); 360 361 new Timer(killTimerName).schedule(new TimerTask() { 362 @Override 363 public void run() { 364 try { 365 logger.log(Level.INFO, "STOP GRACE PERIOD OVER. STOPPING WATCH " + config.getLocalDir() + " ..."); 366 367 if (watchThread != null && !watchThread.isInterrupted()) { 368 watchThread.interrupt(); 369 } 370 } 371 catch (Exception e) { 372 logger.log(Level.INFO, "Forcefully stopping watch thread FAILED at " + config.getLocalDir() + ". Giving up."); 373 } 374 } 375 }, STOP_GRACE_PERIOD); 376 } 377 378 private void forceKillWatchThread() { 379 try { 380 logger.log(Level.INFO, "STOPPING WATCH " + config.getLocalDir() + " ..."); 381 382 if (watchThread != null && !watchThread.isInterrupted()) { 383 watchThread.interrupt(); 384 } 385 } 386 catch (Exception e) { 387 logger.log(Level.INFO, "Forcefully stopping watch thread FAILED at " + config.getLocalDir() + ". Giving up.", e); 388 } 389 } 390 391 private void fireStartEvent() { 392 eventBus.post(new WatchStartSyncExternalEvent(config.getLocalDir().getAbsolutePath())); 393 } 394 395 396 private void fireEndEvent() { 397 eventBus.post(new WatchEndSyncExternalEvent(config.getLocalDir().getAbsolutePath())); 398 } 399}