001/* 002 * Syncany, www.syncany.org 003 * Copyright (C) 2011-2016 Philipp C. Heckel <philipp.heckel@gmail.com> 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package org.syncany.operations.up; 019 020import java.io.File; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.OutputStreamWriter; 024import java.io.PrintWriter; 025import java.sql.SQLException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.Date; 030import java.util.Iterator; 031import java.util.List; 032import java.util.concurrent.BlockingQueue; 033import java.util.concurrent.LinkedBlockingQueue; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037import org.syncany.chunk.Deduper; 038import org.syncany.config.Config; 039import org.syncany.database.ChunkEntry; 040import org.syncany.database.DatabaseVersion; 041import org.syncany.database.DatabaseVersionHeader; 042import org.syncany.database.FileContent; 043import org.syncany.database.FileVersion; 044import org.syncany.database.MemoryDatabase; 045import org.syncany.database.MultiChunkEntry; 046import org.syncany.database.MultiChunkEntry.MultiChunkId; 047import org.syncany.database.PartialFileHistory; 048import org.syncany.database.SqlDatabase; 049import org.syncany.database.VectorClock; 050import org.syncany.database.dao.DatabaseXmlSerializer; 051import org.syncany.database.dao.DatabaseXmlSerializer.DatabaseReadType; 052import org.syncany.operations.AbstractTransferOperation; 053import org.syncany.operations.ChangeSet; 054import org.syncany.operations.cleanup.CleanupOperation; 055import org.syncany.operations.daemon.messages.UpEndSyncExternalEvent; 056import org.syncany.operations.daemon.messages.UpStartSyncExternalEvent; 057import org.syncany.operations.down.DownOperation; 058import org.syncany.operations.ls_remote.LsRemoteOperation; 059import org.syncany.operations.ls_remote.LsRemoteOperationResult; 060import org.syncany.operations.status.StatusOperation; 061import org.syncany.operations.status.StatusOperationResult; 062import org.syncany.operations.up.UpOperationResult.UpResultCode; 063import org.syncany.plugins.transfer.RemoteTransaction; 064import org.syncany.plugins.transfer.StorageException; 065import org.syncany.plugins.transfer.TransferManager; 066import org.syncany.plugins.transfer.files.DatabaseRemoteFile; 067import org.syncany.plugins.transfer.files.MultichunkRemoteFile; 068import org.syncany.plugins.transfer.files.TransactionRemoteFile; 069import org.syncany.plugins.transfer.to.ActionTO; 070import org.syncany.plugins.transfer.to.ActionTO.ActionStatus; 071import org.syncany.plugins.transfer.to.ActionTO.ActionType; 072import org.syncany.plugins.transfer.to.TransactionTO; 073 074/** 075 * The up operation implements a central part of Syncany's business logic. It analyzes the local 076 * folder, deduplicates new or changed files and uploads newly packed multichunks to the remote 077 * storage. The up operation is the complement to the {@link DownOperation}. 078 * 079 * <p>The general operation flow is as follows: 080 * <ol> 081 * <li>Load local database (if not already loaded)</li> 082 * <li>Analyze local directory using the {@link StatusOperation} to determine any changed/new/deleted files</li> 083 * <li>Determine if there are unknown remote databases using the {@link LsRemoteOperation}, and skip the rest if there are</li> 084 * <li>If there are changes, use the {@link Deduper} and {@link Indexer} to create a new {@link DatabaseVersion} 085 * (including new chunks, multichunks, file contents and file versions).</li> 086 * <li>Upload new multichunks (if any) using a {@link TransferManager}</li> 087 * <li>Save new {@link DatabaseVersion} to a new (delta) {@link MemoryDatabase} and upload it</li> 088 * <li>Add delta database to local database and store it locally</li> 089 * </ol> 090 * 091 * Besides the normal behavior of creating transactions from local changes and uploading these, 092 * this class is also able to upload existing transactions that have been interrupted during a previous upload attempt. 093 * The up operation analyzes maps the local changes over a number of transactions. The size of these transactions are based 094 * on the settings in {@link UpOperationOptions}. 095 * If a sequence of transactions is interrupted, all queued transactions are written to disk to be resumed later. 096 * The next up operation then reads these transactions and resumes them in the same order as they were queued before the interruption. 097 * 098 * @author Philipp C. Heckel (philipp.heckel@gmail.com) 099 */ 100public class UpOperation extends AbstractTransferOperation { 101 private static final Logger logger = Logger.getLogger(UpOperation.class.getSimpleName()); 102 103 public static final String ACTION_ID = "up"; 104 105 private UpOperationOptions options; 106 private UpOperationResult result; 107 108 private SqlDatabase localDatabase; 109 110 private boolean resuming; 111 private TransactionRemoteFile transactionRemoteFileToResume; 112 private Collection<RemoteTransaction> remoteTransactionsToResume; 113 private BlockingQueue<DatabaseVersion> databaseVersionQueue; 114 115 public UpOperation(Config config) { 116 this(config, new UpOperationOptions()); 117 } 118 119 public UpOperation(Config config, UpOperationOptions options) { 120 super(config, ACTION_ID); 121 122 this.options = options; 123 this.result = new UpOperationResult(); 124 this.localDatabase = new SqlDatabase(config); 125 126 this.resuming = false; 127 this.transactionRemoteFileToResume = null; 128 this.remoteTransactionsToResume = null; 129 this.databaseVersionQueue = new LinkedBlockingQueue<>(); 130 } 131 132 @Override 133 public UpOperationResult execute() throws Exception { 134 logger.log(Level.INFO, ""); 135 logger.log(Level.INFO, "Running 'Sync up' at client " + config.getMachineName() + " ..."); 136 logger.log(Level.INFO, "--------------------------------------------"); 137 138 fireStartEvent(); 139 140 if (!checkPreconditions()) { 141 fireEndEvent(); 142 return result; 143 } 144 145 // Upload action file (lock for cleanup) 146 startOperation(); 147 148 try { 149 if (options.isResume()) { 150 prepareResume(); 151 } 152 153 if (!resuming) { 154 startIndexerThread(databaseVersionQueue); 155 } 156 157 // If we are not resuming from a remote transaction, we need to clean transactions. 158 if (transactionRemoteFileToResume == null) { 159 transferManager.cleanTransactions(); 160 } 161 } 162 catch (BlockingTransfersException e) { 163 stopBecauseOfBlockingTransactions(); 164 return result; 165 } 166 167 // Go wild 168 int numberOfPerformedTransactions = executeTransactions(); 169 updateResult(numberOfPerformedTransactions); 170 171 // Close database connection 172 localDatabase.finalize(); 173 174 // Finish 'up' before 'cleanup' starts 175 finishOperation(); 176 fireEndEvent(); 177 178 return result; 179 } 180 181 private void updateResult(int numberOfPerformedTransactions) { 182 if (numberOfPerformedTransactions == 0) { 183 logger.log(Level.INFO, "Local database is up-to-date. NOTHING TO DO!"); 184 result.setResultCode(UpResultCode.OK_NO_CHANGES); 185 } 186 else { 187 logger.log(Level.INFO, "Sync up done."); 188 result.setResultCode(UpResultCode.OK_CHANGES_UPLOADED); 189 } 190 } 191 192 private void startIndexerThread(BlockingQueue<DatabaseVersion> databaseVersionQueue) { 193 // Get a list of files that have been updated 194 ChangeSet localChanges = result.getStatusResult().getChangeSet(); 195 List<File> locallyUpdatedFiles = extractLocallyUpdatedFiles(localChanges); 196 List<File> locallyDeletedFiles = extractLocallyDeletedFiles(localChanges); 197 198 // Iterate over the changes, deduplicate, and feed DatabaseVersions into an iterator 199 Deduper deduper = new Deduper(config.getChunker(), config.getMultiChunker(), config.getTransformer(), options.getTransactionSizeLimit(), 200 options.getTransactionFileLimit()); 201 202 AsyncIndexer asyncIndexer = new AsyncIndexer(config, deduper, locallyUpdatedFiles, locallyDeletedFiles, databaseVersionQueue); 203 new Thread(asyncIndexer, "AsyncI/" + config.getLocalDir().getName()).start(); 204 } 205 206 private void prepareResume() throws Exception { 207 Collection<Long> versionsToResume = transferManager.loadPendingTransactionList(); 208 boolean hasVersionsToResume = versionsToResume != null && versionsToResume.size() > 0; 209 210 if (hasVersionsToResume) { 211 logger.log(Level.INFO, "Found local transaction to resume."); 212 logger.log(Level.INFO, "Attempting to find transactionRemoteFile"); 213 214 remoteTransactionsToResume = attemptResumeTransactions(versionsToResume); 215 Collection<DatabaseVersion> remoteDatabaseVersionsToResume = attemptResumeDatabaseVersions(versionsToResume); 216 217 resuming = remoteDatabaseVersionsToResume != null && remoteTransactionsToResume != null && 218 remoteDatabaseVersionsToResume.size() == remoteTransactionsToResume.size(); 219 220 if (resuming) { 221 databaseVersionQueue.addAll(remoteDatabaseVersionsToResume); 222 databaseVersionQueue.add(new DatabaseVersion()); // Empty database version is the stopping marker 223 224 transactionRemoteFileToResume = attemptResumeTransactionRemoteFile(); 225 } 226 else { 227 transferManager.clearResumableTransactions(); 228 } 229 } 230 else { 231 transferManager.clearResumableTransactions(); 232 } 233 } 234 235 /** 236 * Transfers the given {@link DatabaseVersion} objects to the remote. 237 * Each {@link DatabaseVersion} will be transferred in its own {@link RemoteTransaction} object. 238 * 239 * This method resumes an interrupted sequence of earlier transactions. 240 * It expects the {@link DatabaseVersion} and {@link RemoteTransaction} files to be in the same order as they were originally generated. 241 * The first {@link DatabaseVersion} and {@link RemoteTransaction} objects should match the interrupted transaction. 242 * 243 * The assumption is that the given {@link RemoteTransaction} objects match the given {@link DatabaseVersion} objects. 244 * The given {@link TransactionRemoteFile} corresponds to the file on the remote from the interrupted transaction. 245 * 246 * @param databaseVersionQueue The {@link DatabaseVersion} objects to send to the remote. 247 * @param remoteTransactionsToResume {@link RemoteTransaction} objects that correspond to the given {@link DatabaseVersion} objects. 248 * @param transactionRemoteFileToResume The file on the remote that was used for the specific transaction that was interrupted. 249 */ 250 private int executeTransactions() throws Exception { 251 Iterator<RemoteTransaction> remoteTransactionsToResumeIterator = (resuming) ? remoteTransactionsToResume.iterator() : null; 252 253 // At this point, if a failure occurs from which we can resume, new transaction files will be written 254 // Delete any old transaction files 255 transferManager.clearPendingTransactions(); 256 257 boolean detectedFailure = false; 258 Exception caughtFailure = null; 259 List<RemoteTransaction> remainingRemoteTransactions = new ArrayList<>(); 260 List<DatabaseVersion> remainingDatabaseVersions = new ArrayList<>(); 261 262 DatabaseVersion databaseVersion = databaseVersionQueue.take(); 263 boolean noDatabaseVersions = databaseVersion.isEmpty(); 264 265 // Add dirty data to first database 266 addDirtyData(databaseVersion); 267 268 // 269 while (!databaseVersion.isEmpty()) { 270 RemoteTransaction remoteTransaction = null; 271 272 if (!resuming) { 273 VectorClock newVectorClock = findNewVectorClock(); 274 275 databaseVersion.setVectorClock(newVectorClock); 276 databaseVersion.setTimestamp(new Date()); 277 databaseVersion.setClient(config.getMachineName()); 278 279 remoteTransaction = new RemoteTransaction(config, transferManager); 280 281 // Add multichunks to transaction 282 logger.log(Level.INFO, "Uploading new multichunks ..."); 283 284 // This call adds newly changed chunks to a "RemoteTransaction", so they can be uploaded later. 285 addMultiChunksToTransaction(remoteTransaction, databaseVersion.getMultiChunks()); 286 } 287 else { 288 remoteTransaction = remoteTransactionsToResumeIterator.next(); 289 } 290 291 logger.log(Level.INFO, "Uploading database: " + databaseVersion); 292 293 // Create delta database and commit transaction 294 // The information about file changes is written to disk to locally "commit" the transaction. This 295 // enables Syncany to later resume the transaction if it is interrupted before completion. 296 writeAndAddDeltaDatabase(remoteTransaction, databaseVersion, resuming); 297 298 // This thread is to be run when the transaction is interrupted for connectivity reasons. It will serialize 299 // the transaction and metadata in memory such that the transaction can be resumed later. 300 Thread writeResumeFilesShutDownHook = createAndAddShutdownHook(remoteTransaction, databaseVersion); 301 302 // This performs the actual sync to the remote. It is executed synchronously. Only after the changes 303 // are confirmed to have been safely pushed to the remote, will the transaction be marked as complete. 304 if (!detectedFailure) { 305 boolean committingFailed = true; 306 try { 307 if (transactionRemoteFileToResume == null) { 308 remoteTransaction.commit(); 309 } 310 else { 311 remoteTransaction.commit(config.getTransactionFile(), transactionRemoteFileToResume); 312 transactionRemoteFileToResume = null; 313 } 314 315 logger.log(Level.INFO, "Persisting local SQL database (new database version {0}) ...", databaseVersion.getHeader().toString()); 316 long newDatabaseVersionId = localDatabase.writeDatabaseVersion(databaseVersion); 317 318 logger.log(Level.INFO, "Removing DIRTY database versions from database ..."); 319 localDatabase.removeDirtyDatabaseVersions(newDatabaseVersionId); 320 321 logger.log(Level.INFO, "Adding database version to result changes:" + databaseVersion); 322 addNewDatabaseChangesToResultChanges(databaseVersion, result.getChangeSet()); 323 324 result.incrementTransactionsCompleted(); 325 326 327 logger.log(Level.INFO, "Committing local database."); 328 localDatabase.commit(); 329 330 committingFailed = false; 331 } 332 catch (Exception e) { 333 detectedFailure = true; 334 caughtFailure = e; 335 } 336 finally { 337 // The JVM has not shut down, so we can remove the shutdown hook. 338 // If it turns out that committing has failed, we run it explicitly. 339 removeShutdownHook(writeResumeFilesShutDownHook); 340 341 if (committingFailed) { 342 remainingRemoteTransactions.add(remoteTransaction); 343 remainingDatabaseVersions.add(databaseVersion); 344 } 345 } 346 } 347 else { 348 remainingRemoteTransactions.add(remoteTransaction); 349 remainingDatabaseVersions.add(databaseVersion); 350 } 351 352 if (!noDatabaseVersions) { 353 logger.log(Level.FINE, "Waiting for new database version."); 354 databaseVersion = databaseVersionQueue.take(); 355 logger.log(Level.FINE, "Took new database version: " + databaseVersion); 356 } 357 else { 358 logger.log(Level.FINE, "Not waiting for new database version, last one has been taken."); 359 break; 360 } 361 362 } 363 364 if (detectedFailure) { 365 localDatabase.rollback(); 366 serializeRemoteTransactionsAndMetadata(remainingRemoteTransactions, remainingDatabaseVersions); 367 throw caughtFailure; 368 } 369 370 return (int) result.getTransactionsCompleted(); 371 } 372 373 private TransactionRemoteFile attemptResumeTransactionRemoteFile() throws StorageException, BlockingTransfersException { 374 TransactionRemoteFile transactionRemoteFile = null; 375 376 // They look for the matching transaction on the remote. 377 List<TransactionRemoteFile> transactions = transferManager.getTransactionsByClient(config.getMachineName()); 378 379 // If there are blocking transactions, they stop completely. 380 // Not sure yet what these blocking structures are. 381 if (transactions == null) { 382 // We have blocking transactions 383 stopBecauseOfBlockingTransactions(); 384 throw new BlockingTransfersException(); 385 } 386 387 // There is no sign of the transaction on the remote. Clean up the local transaction. 388 if (transactions.size() != 1) { 389 logger.log(Level.INFO, "Unable to find (unique) transactionRemoteFile. Not resuming."); 390 transferManager.clearResumableTransactions(); 391 } 392 // Remote transaction file found. 393 else { 394 transactionRemoteFile = transactions.get(0); 395 } 396 397 return transactionRemoteFile; 398 } 399 400 /** 401 * This method creates a Thread, which serializes the {@link remoteTransaction} in the state at the time the thread is run, 402 * as well as the {@link DatabaseVersion} that contains the metadata about what is uploaded in this transaction. 403 * 404 * @param newDatabaseVersion DatabaseVersion that contains everything that should be locally saved when current transaction is resumed. 405 * 406 * @return Thread which is attached as a shutdownHook. 407 */ 408 private Thread createAndAddShutdownHook(final RemoteTransaction remoteTransaction, final DatabaseVersion newDatabaseVersion) { 409 Thread writeResumeFilesShutDownHook = new Thread(new Runnable() { 410 @Override 411 public void run() { 412 serializeRemoteTransactionsAndMetadata(Arrays.asList(remoteTransaction), Arrays.asList(newDatabaseVersion)); 413 } 414 }, "ResumeShtdwn"); 415 416 logger.log(Level.INFO, "Adding shutdown hook (to allow resuming the upload) ..."); 417 418 Runtime.getRuntime().addShutdownHook(writeResumeFilesShutDownHook); 419 return writeResumeFilesShutDownHook; 420 } 421 422 private void removeShutdownHook(Thread writeResumeFilesShutDownHook) { 423 Runtime.getRuntime().removeShutdownHook(writeResumeFilesShutDownHook); 424 } 425 426 private void fireStartEvent() { 427 eventBus.post(new UpStartSyncExternalEvent(config.getLocalDir().getAbsolutePath())); 428 } 429 430 private void fireEndEvent() { 431 eventBus.post(new UpEndSyncExternalEvent(config.getLocalDir().getAbsolutePath(), result.getResultCode(), result.getChangeSet())); 432 } 433 434 /** 435 * This method sets the correct {@link UpResultCode} when another client has a transaction in progress with deletions. 436 */ 437 private void stopBecauseOfBlockingTransactions() throws StorageException { 438 logger.log(Level.INFO, "Another client is blocking the repo with unfinished cleanup."); 439 result.setResultCode(UpResultCode.NOK_REPO_BLOCKED); 440 441 finishOperation(); 442 fireEndEvent(); 443 } 444 445 /** 446 * This method checks if: 447 * 448 * <ul> 449 * <li>If there are local changes => No need for Up.</li> 450 * <li>If another clients is running Cleanup => Not allowed to upload.</li> 451 * <li>If remote changes exist => Should Down first.</li> 452 * </ul> 453 * 454 * @returns boolean true if Up can and should be done, false otherwise. 455 */ 456 private boolean checkPreconditions() throws Exception { 457 // Find local changes 458 StatusOperation statusOperation = new StatusOperation(config, options.getStatusOptions()); 459 StatusOperationResult statusOperationResult = statusOperation.execute(); 460 ChangeSet localChanges = statusOperationResult.getChangeSet(); 461 462 result.getStatusResult().setChangeSet(localChanges); 463 464 if (!localChanges.hasChanges()) { 465 logger.log(Level.INFO, "Local database is up-to-date (change set). NOTHING TO DO!"); 466 result.setResultCode(UpResultCode.OK_NO_CHANGES); 467 468 return false; 469 } 470 471 // Check if other operations are running 472 if (otherRemoteOperationsRunning(CleanupOperation.ACTION_ID)) { 473 logger.log(Level.INFO, "* Cleanup running. Skipping down operation."); 474 result.setResultCode(UpResultCode.NOK_UNKNOWN_DATABASES); 475 476 return false; 477 } 478 479 // Find remote changes (unless --force is enabled) 480 if (!options.forceUploadEnabled()) { 481 LsRemoteOperationResult lsRemoteOperationResult = new LsRemoteOperation(config, transferManager).execute(); 482 List<DatabaseRemoteFile> unknownRemoteDatabases = lsRemoteOperationResult.getUnknownRemoteDatabases(); 483 484 if (unknownRemoteDatabases.size() > 0) { 485 logger.log(Level.INFO, "There are remote changes. Call 'down' first or use --force-upload you must, Luke!"); 486 logger.log(Level.FINE, "Unknown remote databases are: " + unknownRemoteDatabases); 487 result.setResultCode(UpResultCode.NOK_UNKNOWN_DATABASES); 488 489 return false; 490 } 491 else { 492 logger.log(Level.INFO, "No remote changes, ready to upload."); 493 } 494 } 495 else { 496 logger.log(Level.INFO, "Force (--force-upload) is enabled, ignoring potential remote changes."); 497 } 498 499 return true; 500 } 501 502 /** 503 * This method takes the metadata that is to be uploaded, loads it into a {@link MemoryDatabase} and serializes 504 * it to a file. If this is not a resumption of a previous transaction, this file is added to the transaction. 505 * Finally, databaseversions that are uploaded are remembered as known, such that they are not downloaded in future Downs. 506 * 507 * @param newDatabaseVersion {@link DatabaseVersion} containing all metadata that would be locally persisted if the transaction succeeds. 508 * @param resuming boolean indicating if the current transaction is in the process of being resumed. 509 */ 510 private void writeAndAddDeltaDatabase(RemoteTransaction remoteTransaction, DatabaseVersion newDatabaseVersion, boolean resuming) 511 throws InterruptedException, StorageException, 512 IOException, 513 SQLException { 514 // Clone database version (necessary, because the original must not be touched) 515 DatabaseVersion deltaDatabaseVersion = newDatabaseVersion.clone(); 516 517 // New delta database 518 MemoryDatabase deltaDatabase = new MemoryDatabase(); 519 deltaDatabase.addDatabaseVersion(deltaDatabaseVersion); 520 521 // Save delta database locally 522 long newestLocalDatabaseVersion = getNewestDatabaseFileVersion(config.getMachineName(), localDatabase.getKnownDatabases()); 523 DatabaseRemoteFile remoteDeltaDatabaseFile = new DatabaseRemoteFile(config.getMachineName(), newestLocalDatabaseVersion + 1); 524 File localDeltaDatabaseFile = config.getCache().getDatabaseFile(remoteDeltaDatabaseFile.getName()); 525 526 logger.log(Level.INFO, "Saving local delta database, version {0} to file {1} ... ", new Object[] { deltaDatabaseVersion.getHeader(), 527 localDeltaDatabaseFile }); 528 529 saveDeltaDatabase(deltaDatabase, localDeltaDatabaseFile); 530 531 if (!resuming) { 532 // Upload delta database, if we are not resuming (in which case the db is in the transaction already) 533 logger.log(Level.INFO, "- Uploading local delta database file ..."); 534 addLocalDatabaseToTransaction(remoteTransaction, localDeltaDatabaseFile, remoteDeltaDatabaseFile); 535 } 536 // Remember uploaded database as known. 537 List<DatabaseRemoteFile> newDatabaseRemoteFiles = new ArrayList<DatabaseRemoteFile>(); 538 newDatabaseRemoteFiles.add(remoteDeltaDatabaseFile); 539 localDatabase.writeKnownRemoteDatabases(newDatabaseRemoteFiles); 540 } 541 542 /** 543 * Serializes a {@link MemoryDatabase} to a file, using the configured transformer. 544 */ 545 protected void saveDeltaDatabase(MemoryDatabase db, File localDatabaseFile) throws IOException { 546 logger.log(Level.INFO, "- Saving database to " + localDatabaseFile + " ..."); 547 548 DatabaseXmlSerializer dao = new DatabaseXmlSerializer(config.getTransformer()); 549 dao.save(db.getDatabaseVersions(), localDatabaseFile); 550 } 551 552 /** 553 * This methods iterates over all {@link DatabaseVersion}s that are dirty. Dirty means that they are not in the winning 554 * branch. All data which is contained in these dirty DatabaseVersions is added to the newDatabaseVersion, so that it 555 * is included in the new Up. Note that only metadata is reuploaded, the actual multichunks are still in the repository. 556 * 557 * @param newDatabaseVersion {@link DatabaseVersion} to which dirty data should be added. 558 */ 559 private void addDirtyData(DatabaseVersion newDatabaseVersion) { 560 Iterator<DatabaseVersion> dirtyDatabaseVersions = localDatabase.getDirtyDatabaseVersions(); 561 562 if (!dirtyDatabaseVersions.hasNext()) { 563 logger.log(Level.INFO, "No DIRTY data found in database (no dirty databases); Nothing to do here."); 564 } 565 else { 566 logger.log(Level.INFO, "Adding DIRTY data to new database version: "); 567 568 while (dirtyDatabaseVersions.hasNext()) { 569 DatabaseVersion dirtyDatabaseVersion = dirtyDatabaseVersions.next(); 570 571 logger.log(Level.INFO, "- Adding chunks/multichunks/filecontents from database version " + dirtyDatabaseVersion.getHeader()); 572 573 for (ChunkEntry chunkEntry : dirtyDatabaseVersion.getChunks()) { 574 newDatabaseVersion.addChunk(chunkEntry); 575 } 576 577 for (MultiChunkEntry multiChunkEntry : dirtyDatabaseVersion.getMultiChunks()) { 578 newDatabaseVersion.addMultiChunk(multiChunkEntry); 579 } 580 581 for (FileContent fileContent : dirtyDatabaseVersion.getFileContents()) { 582 newDatabaseVersion.addFileContent(fileContent); 583 } 584 } 585 } 586 } 587 588 /** 589 * This method extracts the files that are new or changed from a {@link ChangeSet} of local changes. 590 * 591 * @param localChanges {@link ChangeSet} that was the result from a StatusOperation. 592 * 593 * @return a list of Files that are new or have been changed. 594 */ 595 private List<File> extractLocallyUpdatedFiles(ChangeSet localChanges) { 596 List<File> locallyUpdatedFiles = new ArrayList<File>(); 597 598 for (String relativeFilePath : localChanges.getNewFiles()) { 599 locallyUpdatedFiles.add(new File(config.getLocalDir() + File.separator + relativeFilePath)); 600 } 601 602 for (String relativeFilePath : localChanges.getChangedFiles()) { 603 locallyUpdatedFiles.add(new File(config.getLocalDir() + File.separator + relativeFilePath)); 604 } 605 606 return locallyUpdatedFiles; 607 } 608 609 private List<File> extractLocallyDeletedFiles(ChangeSet localChanges) { 610 List<File> locallyDeletedFiles = new ArrayList<File>(); 611 612 for (String relativeFilePath : localChanges.getDeletedFiles()) { 613 locallyDeletedFiles.add(new File(config.getLocalDir() + File.separator + relativeFilePath)); 614 } 615 616 return locallyDeletedFiles; 617 } 618 619 /** 620 * This method fills a {@link ChangeSet} with the files and changes that are uploaded, to include in 621 * the {@link UpOperationResult}. 622 * 623 * @param newDatabaseVersion {@link DatabaseVersion} that contains the changes. 624 * @param resultChanges {@ChangeSet} to which these changes are to be added. 625 */ 626 private void addNewDatabaseChangesToResultChanges(DatabaseVersion newDatabaseVersion, ChangeSet resultChanges) { 627 for (PartialFileHistory partialFileHistory : newDatabaseVersion.getFileHistories()) { 628 FileVersion lastFileVersion = partialFileHistory.getLastVersion(); 629 630 switch (lastFileVersion.getStatus()) { 631 case NEW: 632 resultChanges.getNewFiles().add(lastFileVersion.getPath()); 633 break; 634 635 case CHANGED: 636 case RENAMED: 637 resultChanges.getChangedFiles().add(lastFileVersion.getPath()); 638 break; 639 640 case DELETED: 641 resultChanges.getDeletedFiles().add(lastFileVersion.getPath()); 642 break; 643 } 644 } 645 } 646 647 /** 648 * This methods adds the multichunks that are not yet present in the remote repo to the {@link RemoteTransaction} for 649 * uploading. Multichunks are not uploaded if they are dirty. 650 * 651 * @param multiChunkEntries Collection of multiChunkEntries that are included in the new {@link DatabaseVersion} 652 */ 653 private void addMultiChunksToTransaction(RemoteTransaction remoteTransaction, Collection<MultiChunkEntry> multiChunksEntries) 654 throws InterruptedException, StorageException { 655 List<MultiChunkId> dirtyMultiChunkIds = localDatabase.getDirtyMultiChunkIds(); 656 657 for (MultiChunkEntry multiChunkEntry : multiChunksEntries) { 658 if (dirtyMultiChunkIds.contains(multiChunkEntry.getId())) { 659 logger.log(Level.INFO, "- Ignoring multichunk (from dirty database, already uploaded), " + multiChunkEntry.getId() + " ..."); 660 } 661 else { 662 File localMultiChunkFile = config.getCache().getEncryptedMultiChunkFile(multiChunkEntry.getId()); 663 MultichunkRemoteFile remoteMultiChunkFile = new MultichunkRemoteFile(multiChunkEntry.getId()); 664 665 logger.log(Level.INFO, "- Uploading multichunk {0} from {1} to {2} ...", new Object[] { multiChunkEntry.getId(), localMultiChunkFile, 666 remoteMultiChunkFile }); 667 668 remoteTransaction.upload(localMultiChunkFile, remoteMultiChunkFile); 669 } 670 } 671 } 672 673 private void addLocalDatabaseToTransaction(RemoteTransaction remoteTransaction, File localDatabaseFile, DatabaseRemoteFile remoteDatabaseFile) 674 throws InterruptedException, 675 StorageException { 676 677 logger.log(Level.INFO, "- Uploading " + localDatabaseFile + " to " + remoteDatabaseFile + " ..."); 678 remoteTransaction.upload(localDatabaseFile, remoteDatabaseFile); 679 } 680 681 /** 682 * Finds the next vector clock 683 * 684 * <p>There are two causes for not having a previous vector clock: 685 * <ul> 686 * <li>This is the initial version 687 * <li>A cleanup has wiped *all* database versions 688 * </ul> 689 * 690 * In the latter case, the method looks at the previous database version numbers 691 * to determine a new vector clock 692 */ 693 private VectorClock findNewVectorClock() { 694 // Get last vector clock 695 DatabaseVersionHeader lastDatabaseVersionHeader = localDatabase.getLastDatabaseVersionHeader(); 696 VectorClock lastVectorClock = (lastDatabaseVersionHeader != null) ? lastDatabaseVersionHeader.getVectorClock() : new VectorClock(); 697 698 logger.log(Level.INFO, "Last vector clock was: " + lastVectorClock); 699 700 boolean noPreviousVectorClock = lastVectorClock.isEmpty(); 701 702 if (noPreviousVectorClock) { 703 lastVectorClock = localDatabase.getHighestKnownDatabaseFilenameNumbers(); 704 } 705 706 VectorClock newVectorClock = lastVectorClock.clone(); 707 708 Long lastLocalValue = lastVectorClock.getClock(config.getMachineName()); 709 Long lastDirtyLocalValue = localDatabase.getMaxDirtyVectorClock(config.getMachineName()); 710 711 Long newLocalValue = null; 712 713 if (lastDirtyLocalValue != null) { 714 newLocalValue = lastDirtyLocalValue + 1; 715 } 716 else { 717 if (lastLocalValue != null) { 718 newLocalValue = lastLocalValue + 1; 719 } 720 else { 721 newLocalValue = 1L; 722 } 723 } 724 725 newVectorClock.setClock(config.getMachineName(), newLocalValue); 726 727 return newVectorClock; 728 } 729 730 private Collection<RemoteTransaction> attemptResumeTransactions(Collection<Long> versions) { 731 try { 732 Collection<RemoteTransaction> remoteTransactions = new ArrayList<>(); 733 734 for (Long version : versions) { 735 File transactionFile = config.getTransactionFile(version); 736 737 // If a single transaction file is missing, we should restart 738 if (!transactionFile.exists()) { 739 return null; 740 } 741 742 TransactionTO transactionTO = TransactionTO.load(null, transactionFile); 743 744 // Verify if all files needed are in cache. 745 for (ActionTO action : transactionTO.getActions()) { 746 if (action.getType() == ActionType.UPLOAD) { 747 if (action.getStatus() == ActionStatus.UNSTARTED) { 748 if (!action.getLocalTempLocation().exists()) { 749 // Unstarted upload has no cached local copy, abort 750 return null; 751 } 752 } 753 } 754 } 755 756 remoteTransactions.add(new RemoteTransaction(config, transferManager, transactionTO)); 757 } 758 759 return remoteTransactions; 760 } catch (Exception e) { 761 logger.log(Level.WARNING, "Invalid transaction file. Cannot resume!"); 762 return null; 763 } 764 } 765 766 private Collection<DatabaseVersion> attemptResumeDatabaseVersions(Collection<Long> versions) throws Exception { 767 try { 768 Collection<DatabaseVersion> databaseVersions = new ArrayList<>(); 769 770 for (Long version : versions) { 771 File databaseFile = config.getTransactionDatabaseFile(version); 772 773 // If a single database file is missing, we should restart 774 if (!databaseFile.exists()) { 775 return null; 776 } 777 778 DatabaseXmlSerializer databaseSerializer = new DatabaseXmlSerializer(); 779 MemoryDatabase memoryDatabase = new MemoryDatabase(); 780 databaseSerializer.load(memoryDatabase, databaseFile, null, null, DatabaseReadType.FULL); 781 782 if (memoryDatabase.getDatabaseVersions().size() == 0) { 783 return null; 784 } 785 786 databaseVersions.add(memoryDatabase.getLastDatabaseVersion()); 787 } 788 789 return databaseVersions; 790 } catch (Exception e) { 791 logger.log(Level.WARNING, "Cannot load database versions from 'state'. Cannot resume."); 792 return null; 793 } 794 } 795 796 /** 797 * Serializes both the remote transaction and the current database version 798 * that would be added if Up was successful. 799 * @param newDatabaseVersion the current metadata 800 */ 801 private void serializeRemoteTransactionsAndMetadata(List<RemoteTransaction> remoteTransactions, List<DatabaseVersion> newDatabaseVersions) { 802 try { 803 logger.log(Level.INFO, "Persisting status of UpOperation to " + config.getStateDir() + " ..."); 804 805 // Collect a list of all database version numbers that will be saved 806 List<Long> databaseVersionClocks = new ArrayList<>(); 807 for (int i = 0; i < remoteTransactions.size(); i++) { 808 DatabaseVersion databaseVersion = newDatabaseVersions.get(i); 809 long databaseVersionClock = databaseVersion.getVectorClock().getClock(config.getMachineName()); 810 databaseVersionClocks.add(databaseVersionClock); 811 } 812 813 // Write the list of version number to a file, before serializing any transactions! 814 // This ensures that no transaction files can exist without a "reference" to them. 815 File transactionListFile = config.getTransactionListFile(); 816 PrintWriter transactionListWriter = new PrintWriter(new OutputStreamWriter( 817 new FileOutputStream(transactionListFile), "UTF-8")); 818 for (Long databaseVersion : databaseVersionClocks) { 819 transactionListWriter.println(databaseVersion); 820 } 821 transactionListWriter.close(); 822 823 // For each database version write the transaction and database files 824 for (int i = 0; i < remoteTransactions.size(); i++) { 825 DatabaseVersion databaseVersion = newDatabaseVersions.get(i); 826 long databaseVersionClock = databaseVersionClocks.get(i); 827 828 // Writing transaction file to state dir 829 remoteTransactions.get(i).writeToFile(null, config.getTransactionFile(databaseVersionClock)); 830 831 // Writing database representation of new database version to state dir 832 MemoryDatabase memoryDatabase = new MemoryDatabase(); 833 memoryDatabase.addDatabaseVersion(databaseVersion); 834 835 DatabaseXmlSerializer dao = new DatabaseXmlSerializer(); 836 dao.save(memoryDatabase.getDatabaseVersions(), config.getTransactionDatabaseFile(databaseVersionClock)); 837 } 838 839 // The first transaction may be resumable, so write it to the default transaction file 840 remoteTransactions.get(0).writeToFile(null, config.getTransactionFile()); 841 } 842 catch (Exception e) { 843 logger.log(Level.WARNING, "Failure when persisting status of Up: ", e); 844 } 845 } 846}