001package co.codewizards.cloudstore.core.repo.sync; 002 003import static co.codewizards.cloudstore.core.objectfactory.ObjectFactoryUtil.*; 004import static co.codewizards.cloudstore.core.oio.OioFileFactory.*; 005import static co.codewizards.cloudstore.core.util.AssertUtil.*; 006import static co.codewizards.cloudstore.core.util.HashUtil.*; 007import static co.codewizards.cloudstore.core.util.Util.*; 008 009import java.net.MalformedURLException; 010import java.net.URL; 011import java.util.ArrayList; 012import java.util.Collection; 013import java.util.HashMap; 014import java.util.HashSet; 015import java.util.Iterator; 016import java.util.List; 017import java.util.Map; 018import java.util.Set; 019import java.util.SortedMap; 020import java.util.TreeMap; 021import java.util.UUID; 022import java.util.concurrent.Callable; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import java.util.concurrent.Future; 026 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import co.codewizards.cloudstore.core.dto.ChangeSetDto; 031import co.codewizards.cloudstore.core.dto.ConfigPropSetDto; 032import co.codewizards.cloudstore.core.dto.CopyModificationDto; 033import co.codewizards.cloudstore.core.dto.DeleteModificationDto; 034import co.codewizards.cloudstore.core.dto.DirectoryDto; 035import co.codewizards.cloudstore.core.dto.FileChunkDto; 036import co.codewizards.cloudstore.core.dto.ModificationDto; 037import co.codewizards.cloudstore.core.dto.NormalFileDto; 038import co.codewizards.cloudstore.core.dto.RepoFileDto; 039import co.codewizards.cloudstore.core.dto.RepoFileDtoTreeNode; 040import co.codewizards.cloudstore.core.dto.RepositoryDto; 041import co.codewizards.cloudstore.core.dto.SymlinkDto; 042import co.codewizards.cloudstore.core.dto.VersionInfoDto; 043import co.codewizards.cloudstore.core.oio.File; 044import co.codewizards.cloudstore.core.progress.ProgressMonitor; 045import co.codewizards.cloudstore.core.progress.SubProgressMonitor; 046import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper; 047import co.codewizards.cloudstore.core.repo.local.LocalRepoManager; 048import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory; 049import co.codewizards.cloudstore.core.repo.transport.CollisionException; 050import co.codewizards.cloudstore.core.repo.transport.LocalRepoTransport; 051import co.codewizards.cloudstore.core.repo.transport.RepoTransport; 052import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactory; 053import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactoryRegistry; 054import co.codewizards.cloudstore.core.repo.transport.TransferDoneMarkerType; 055import co.codewizards.cloudstore.core.util.UrlUtil; 056import co.codewizards.cloudstore.core.version.VersionCompatibilityValidator; 057 058/** 059 * Logic for synchronising a local with a remote repository. 060 * @author Marco หงุ่ยตระกูล-Schulze - marco at codewizards dot co 061 */ 062public class RepoToRepoSync implements AutoCloseable { 063 private static final Logger logger = LoggerFactory.getLogger(RepoToRepoSync.class); 064 065 /** 066 * Sync in the inverse direction. This is only for testing whether the RepoTransport implementations 067 * are truly symmetric. It is less efficient! Therefore, this must NEVER be true in production!!! 068 */ 069 private static final boolean TEST_INVERSE = false; 070 071 protected final File localRoot; 072 protected final URL remoteRoot; 073 protected final LocalRepoManager localRepoManager; 074 protected final LocalRepoTransport localRepoTransport; 075 protected final RepoTransport remoteRepoTransport; 076 protected final UUID localRepositoryId; 077 protected final UUID remoteRepositoryId; 078 079 private ExecutorService localSyncExecutor; 080 private Future<Void> localSyncFuture; 081 private final Set<UUID> lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds = new HashSet<>(); 082 083 /** 084 * Create an instance. 085 * @param localRoot the root of the local repository or any file/directory inside it. This is 086 * automatically adjusted to fit the connection-point to the remote repository (the remote 087 * repository might be connected to a sub-directory). 088 * @param remoteRoot the root of the remote repository. This must exactly match the connection point. 089 * If a sub-directory of the remote repository is connected to the local repository, this sub-directory 090 * must be referenced here. 091 */ 092 protected RepoToRepoSync(File localRoot, final URL remoteRoot) { 093 final File localRootWithoutPathPrefix = LocalRepoHelper.getLocalRootContainingFile(assertNotNull(localRoot, "localRoot")); 094 this.remoteRoot = UrlUtil.canonicalizeURL(assertNotNull(remoteRoot, "remoteRoot")); 095 localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRootWithoutPathPrefix); 096 this.localRoot = localRoot = createFile(localRootWithoutPathPrefix, localRepoManager.getLocalPathPrefixOrFail(remoteRoot)); 097 098 localRepositoryId = localRepoManager.getRepositoryId(); 099 if (localRepositoryId == null) 100 throw new IllegalStateException("localRepoManager.getRepositoryId() returned null!"); 101 102 remoteRepositoryId = localRepoManager.getRemoteRepositoryIdOrFail(remoteRoot); 103 104 remoteRepoTransport = createRepoTransport(remoteRoot, localRepositoryId); 105 localRepoTransport = (LocalRepoTransport) createRepoTransport(localRoot, remoteRepositoryId); 106 } 107 108 public static RepoToRepoSync create(final File localRoot, final URL remoteRoot) { 109 return createObject(RepoToRepoSync.class, localRoot, remoteRoot); 110 } 111 112 public void sync(final ProgressMonitor monitor) { 113 assertNotNull(monitor, "monitor"); 114 monitor.beginTask("Synchronising...", 201); 115 try { 116 lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds.clear(); 117 final VersionInfoDto clientVersionInfoDto = localRepoTransport.getVersionInfoDto(); 118 final VersionInfoDto serverVersionInfoDto = remoteRepoTransport.getVersionInfoDto(); 119 VersionCompatibilityValidator.getInstance().validate(clientVersionInfoDto, serverVersionInfoDto); 120 121 readRemoteRepositoryIdFromRepoTransport(); 122 monitor.worked(1); 123 124 if (localSyncExecutor != null) 125 throw new IllegalStateException("localSyncExecutor != null"); 126 127 if (localSyncFuture != null) 128 throw new IllegalStateException("localSyncFuture != null"); 129 130 localSyncExecutor = Executors.newFixedThreadPool(1); 131 localSyncFuture = localSyncExecutor.submit(new Callable<Void>() { 132 @Override 133 public Void call() throws Exception { 134 logger.info("sync: locally syncing {} ('{}')", localRepositoryId, localRoot); 135 localRepoManager.localSync(new SubProgressMonitor(monitor, 50)); 136 return null; 137 } 138 }); 139 140 if (!TEST_INVERSE) { // This is the normal sync (NOT test). 141 syncDown(true, new SubProgressMonitor(monitor, 50)); 142 143 if (localSyncExecutor != null) 144 throw new IllegalStateException("localSyncExecutor != null"); 145 146 if (localSyncFuture != null) 147 throw new IllegalStateException("localSyncFuture != null"); 148 149 syncUp(new SubProgressMonitor(monitor, 50)); 150 // Immediately sync back to make sure the changes we caused don't cause problems later 151 // (right now there's very likely no collision and this should be very fast). 152 syncDown(false, new SubProgressMonitor(monitor, 50)); 153 } 154 else { // THIS IS FOR TESTING ONLY! 155 logger.info("sync: locally syncing on *remote* side {} ('{}')", localRepositoryId, localRoot); 156 remoteRepoTransport.getChangeSetDto(true, null); // trigger the local sync on the remote side (we don't need the change set) 157 158 waitForAndCheckLocalSyncFuture(); 159 160 syncUp(new SubProgressMonitor(monitor, 50)); 161 syncDown(false, new SubProgressMonitor(monitor, 50)); 162 syncUp(new SubProgressMonitor(monitor, 50)); 163 } 164 } finally { 165 monitor.done(); 166 } 167 } 168 169 protected void syncUp(final ProgressMonitor monitor) { 170 logger.info("syncUp: fromID={} from='{}' toID={} to='{}'", 171 localRepositoryId, localRoot, remoteRepositoryId, remoteRoot); 172 sync(localRepoTransport, false, remoteRepoTransport, monitor); 173 } 174 175 protected void syncDown(final boolean fromRepoLocalSync, final ProgressMonitor monitor) { 176 logger.info("syncDown: fromID={} from='{}' toID={} to='{}', fromRepoLocalSync={}", 177 remoteRepositoryId, remoteRoot, localRepositoryId, localRoot, fromRepoLocalSync); 178 sync(remoteRepoTransport, fromRepoLocalSync, localRepoTransport, monitor); 179 } 180 181 private void waitForAndCheckLocalSyncFutureIfExists() { 182 if (localSyncFuture != null) 183 waitForAndCheckLocalSyncFuture(); 184 } 185 186 private void waitForAndCheckLocalSyncFuture() { 187 try { 188 assertNotNull(localSyncFuture, "localSyncFuture").get(); 189 } catch (final RuntimeException e) { 190 throw e; 191 } catch (final Exception e) { 192 throw new RuntimeException(e); 193 } 194 assertNotNull(localSyncExecutor, "localSyncExecutor").shutdown(); 195 localSyncFuture = null; 196 localSyncExecutor = null; 197 } 198 199 private void readRemoteRepositoryIdFromRepoTransport() { 200 final UUID repositoryId = remoteRepoTransport.getRepositoryId(); 201 if (repositoryId == null) 202 throw new IllegalStateException("remoteRepoTransport.getRepositoryId() returned null!"); 203 204 if (!repositoryId.equals(remoteRepositoryId)) 205 throw new IllegalStateException( 206 String.format("remoteRepoTransport.getRepositoryId() does not match repositoryId in local DB! %s != %s", repositoryId, remoteRepositoryId)); 207 } 208 209 private RepoTransport createRepoTransport(final File rootFile, final UUID clientRepositoryId) { 210 URL rootURL; 211 try { 212 rootURL = rootFile.toURI().toURL(); 213 } catch (final MalformedURLException e) { 214 throw new RuntimeException(e); 215 } 216 return createRepoTransport(rootURL, clientRepositoryId); 217 } 218 219 private RepoTransport createRepoTransport(final URL remoteRoot, final UUID clientRepositoryId) { 220 final RepoTransportFactory repoTransportFactory = RepoTransportFactoryRegistry.getInstance().getRepoTransportFactoryOrFail(remoteRoot); 221 return repoTransportFactory.createRepoTransport(remoteRoot, clientRepositoryId); 222 } 223 224 protected void sync(final RepoTransport fromRepoTransport, final boolean fromRepoLocalSync, final RepoTransport toRepoTransport, final ProgressMonitor monitor) { 225 monitor.beginTask("Synchronising...", 100); 226 try { 227 Long lastSyncToRemoteRepoLocalRepositoryRevisionSynced = null; 228 if (lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds.add(fromRepoTransport.getRepositoryId())) { 229 RepositoryDto clientRepositoryDto = toRepoTransport.getClientRepositoryDto(); 230 assertNotNull(clientRepositoryDto, "clientRepositoryDto"); 231 lastSyncToRemoteRepoLocalRepositoryRevisionSynced = clientRepositoryDto.getRevision() < 0 ? null : clientRepositoryDto.getRevision(); 232 } 233 234 final ChangeSetDto changeSetDto = fromRepoTransport.getChangeSetDto(fromRepoLocalSync, lastSyncToRemoteRepoLocalRepositoryRevisionSynced); 235 monitor.worked(8); 236 237 waitForAndCheckLocalSyncFutureIfExists(); 238 toRepoTransport.prepareForChangeSetDto(changeSetDto); 239 sync(fromRepoTransport, toRepoTransport, changeSetDto, new SubProgressMonitor(monitor, 90)); 240 241 fromRepoTransport.endSyncFromRepository(); 242 toRepoTransport.endSyncToRepository(changeSetDto.getRepositoryDto().getRevision()); 243 monitor.worked(2); 244 } finally { 245 monitor.done(); 246 } 247 } 248 249 protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 250 final ChangeSetDto changeSetDto, final ProgressMonitor monitor) { 251 monitor.beginTask("Synchronising...", 1 + changeSetDto.getModificationDtos().size() + 3 * changeSetDto.getRepoFileDtos().size() + 1); 252 try { 253 syncParentConfigPropSetDto(fromRepoTransport, toRepoTransport, changeSetDto.getParentConfigPropSetDto(), 254 new SubProgressMonitor(monitor, 1)); 255 256 final RepoFileDtoTreeNode repoFileDtoTree = RepoFileDtoTreeNode.createTree(changeSetDto.getRepoFileDtos()); 257 if (repoFileDtoTree != null) { 258 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 259 new Class<?>[] { DirectoryDto.class }, new Class<?>[0], false, 260 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 261 } 262 263 syncModifications(fromRepoTransport, toRepoTransport, changeSetDto.getModificationDtos(), 264 new SubProgressMonitor(monitor, changeSetDto.getModificationDtos().size())); 265 266 if (repoFileDtoTree != null) { 267 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 268 new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, true, 269 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 270 } 271 272 if (repoFileDtoTree != null) { 273 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 274 new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, false, 275 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 276 } 277 } finally { 278 monitor.done(); 279 } 280 } 281 282 protected void syncParentConfigPropSetDto(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 283 final ConfigPropSetDto parentConfigPropSetDto, final ProgressMonitor monitor) { 284 assertNotNull(fromRepoTransport, "fromRepoTransport"); 285 assertNotNull(toRepoTransport, "toRepoTransport"); 286 // parentConfigPropSetDto may be null! 287 assertNotNull(monitor, "monitor"); 288 289 monitor.beginTask("Synchronising parent-config...", 1); 290 try { 291 if (parentConfigPropSetDto == null) 292 return; 293 294 toRepoTransport.putParentConfigPropSetDto(parentConfigPropSetDto); 295 } finally { 296 monitor.done(); 297 } 298 } 299 300 protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 301 final RepoFileDtoTreeNode repoFileDtoTree, 302 final Class<?>[] repoFileDtoClassesIncl, final Class<?>[] repoFileDtoClassesExcl, final boolean filesInProgressOnly, 303 final ProgressMonitor monitor) { 304 assertNotNull(fromRepoTransport, "fromRepoTransport"); 305 assertNotNull(toRepoTransport, "toRepoTransport"); 306 assertNotNull(repoFileDtoTree, "repoFileDtoTree"); 307 assertNotNull(repoFileDtoClassesIncl, "repoFileDtoClassesIncl"); 308 assertNotNull(repoFileDtoClassesExcl, "repoFileDtoClassesExcl"); 309 assertNotNull(monitor, "monitor"); 310 311 final Map<Class<?>, Boolean> repoFileDtoClass2Included = new HashMap<Class<?>, Boolean>(); 312 final Map<Class<?>, Boolean> repoFileDtoClass2Excluded = new HashMap<Class<?>, Boolean>(); 313 314 final Set<String> fileInProgressPaths = filesInProgressOnly 315 ? localRepoTransport.getFileInProgressPaths(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId()) 316 : null; 317 318 monitor.beginTask("Synchronising...", repoFileDtoTree.size()); 319 try { 320 for (final RepoFileDtoTreeNode repoFileDtoTreeNode : repoFileDtoTree) { 321 if (repoFileDtoTreeNode.getRepoFileDto().isNeededAsParent()) { // not actually modified - serves only to complete the tree structure. 322 monitor.worked(1); 323 continue; 324 } 325 326 if (fileInProgressPaths != null && ! fileInProgressPaths.contains(repoFileDtoTreeNode.getPath())) { 327 monitor.worked(1); 328 continue; 329 } 330 final RepoFileDto repoFileDto = repoFileDtoTreeNode.getRepoFileDto(); 331 final Class<? extends RepoFileDto> repoFileDtoClass = repoFileDto.getClass(); 332 333 Boolean included = repoFileDtoClass2Included.get(repoFileDtoClass); 334 if (included == null) { 335 included = false; 336 for (final Class<?> clazz : repoFileDtoClassesIncl) { 337 if (clazz.isAssignableFrom(repoFileDtoClass)) { 338 included = true; 339 break; 340 } 341 } 342 repoFileDtoClass2Included.put(repoFileDtoClass, included); 343 } 344 345 Boolean excluded = repoFileDtoClass2Excluded.get(repoFileDtoClass); 346 if (excluded == null) { 347 excluded = false; 348 for (final Class<?> clazz : repoFileDtoClassesExcl) { 349 if (clazz.isAssignableFrom(repoFileDtoClass)) { 350 excluded = true; 351 break; 352 } 353 } 354 repoFileDtoClass2Excluded.put(repoFileDtoClass, excluded); 355 } 356 357 if (!included || excluded) { 358 monitor.worked(1); 359 continue; 360 } 361 362 if (isDone(fromRepoTransport, toRepoTransport, repoFileDto)) { 363 logger.debug("sync: Skipping file already done in an interrupted transfer before: {}", repoFileDtoTreeNode.getPath()); 364 monitor.worked(1); 365 continue; 366 } 367 368 if (repoFileDto instanceof DirectoryDto) 369 syncDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (DirectoryDto) repoFileDto, new SubProgressMonitor(monitor, 1)); 370 else if (repoFileDto instanceof NormalFileDto) { 371 syncFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, repoFileDto, monitor); 372 } 373 else if (repoFileDto instanceof SymlinkDto) 374 syncSymlink(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (SymlinkDto) repoFileDto, new SubProgressMonitor(monitor, 1)); 375 else 376 throw new IllegalStateException("Unsupported RepoFileDto type: " + repoFileDto); 377 378 markDone(fromRepoTransport, toRepoTransport, repoFileDto); 379 } 380 } finally { 381 monitor.done(); 382 } 383 } 384 385 private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) { 386 return localRepoTransport.isTransferDone( 387 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 388 TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision()); 389 } 390 391 private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) { 392 localRepoTransport.markTransferDone( 393 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 394 TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision()); 395 } 396 397 private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) { 398 return localRepoTransport.isTransferDone( 399 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 400 TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision()); 401 } 402 403 private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) { 404 localRepoTransport.markTransferDone( 405 fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 406 TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision()); 407 } 408 409 private SortedMap<Long, Collection<ModificationDto>> getLocalRevision2ModificationDtos(final Collection<ModificationDto> modificationDtos) { 410 final SortedMap<Long, Collection<ModificationDto>> map = new TreeMap<Long, Collection<ModificationDto>>(); 411 for (final ModificationDto modificationDto : modificationDtos) { 412 final long localRevision = modificationDto.getLocalRevision(); 413 Collection<ModificationDto> collection = map.get(localRevision); 414 if (collection == null) { 415 collection = new ArrayList<ModificationDto>(); 416 map.put(localRevision, collection); 417 } 418 collection.add(modificationDto); 419 } 420 return map; 421 } 422 423 private void syncModifications(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final Collection<ModificationDto> modificationDtos, final ProgressMonitor monitor) { 424 monitor.beginTask("Synchronising...", modificationDtos.size()); 425 try { 426 final SortedMap<Long,Collection<ModificationDto>> localRevision2ModificationDtos = getLocalRevision2ModificationDtos(modificationDtos); 427 for (final Map.Entry<Long,Collection<ModificationDto>> me : localRevision2ModificationDtos.entrySet()) { 428 final ModificationDtoSet modificationDtoSet = new ModificationDtoSet(me.getValue()); 429 430 for (final List<CopyModificationDto> copyModificationDtos : modificationDtoSet.getFromPath2CopyModificationDtos().values()) { 431 for (final Iterator<CopyModificationDto> itCopyMod = copyModificationDtos.iterator(); itCopyMod.hasNext(); ) { 432 final CopyModificationDto copyModificationDto = itCopyMod.next(); 433 434 if (isDone(fromRepoTransport, toRepoTransport, copyModificationDto)) { 435 logger.debug("sync: Skipping CopyModificaton already done in an interrupted transfer before: {} => {}", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 436 monitor.worked(1); 437 continue; 438 } 439 440 final List<DeleteModificationDto> deleteModificationDtos = modificationDtoSet.getPath2DeleteModificationDtos().get(copyModificationDto.getFromPath()); 441 boolean moveInstead = false; 442 if (!itCopyMod.hasNext() && deleteModificationDtos != null && !deleteModificationDtos.isEmpty()) 443 moveInstead = true; 444 445 if (moveInstead) { 446 logger.info("syncModifications: Moving from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 447 toRepoTransport.move(copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 448 } 449 else { 450 logger.info("syncModifications: Copying from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 451 toRepoTransport.copy(copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 452 } 453 454 if (!moveInstead && deleteModificationDtos != null) { 455 for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) { 456 logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath()); 457 applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto); 458 } 459 } 460 461 markDone(fromRepoTransport, toRepoTransport, copyModificationDto); 462 } 463 } 464 465 for (final List<DeleteModificationDto> deleteModificationDtos : modificationDtoSet.getPath2DeleteModificationDtos().values()) { 466 for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) { 467 if (isDone(fromRepoTransport, toRepoTransport, deleteModificationDto)) { 468 logger.debug("sync: Skipping DeleteModificaton already done in an interrupted transfer before: {}", deleteModificationDto.getPath()); 469 monitor.worked(1); 470 continue; 471 } 472 473 logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath()); 474 applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto); 475 476 markDone(fromRepoTransport, toRepoTransport, deleteModificationDto); 477 } 478 } 479 } 480 } finally { 481 monitor.done(); 482 } 483 } 484 485 protected void applyDeleteModification(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) { 486 assertNotNull(fromRepoTransport, "fromRepoTransport"); 487 assertNotNull(toRepoTransport, "toRepoTransport"); 488 assertNotNull(deleteModificationDto, "deleteModificationDto"); 489 490 try { 491 delete(fromRepoTransport, toRepoTransport, deleteModificationDto); 492 } catch (final CollisionException x) { // Note: This cannot happen in CloudStore! But in can happen in downstream projects with different RepoTransport implementations! 493 logger.info("CollisionException during delete: {}", deleteModificationDto.getPath()); 494 if (logger.isDebugEnabled()) 495 logger.debug(x.toString(), x); 496 497 return; 498 } 499 } 500 501 protected void delete(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) { 502 toRepoTransport.delete(deleteModificationDto.getPath()); 503 } 504 505 private void syncDirectory( 506 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 507 final RepoFileDtoTreeNode repoFileDtoTreeNode, final DirectoryDto directoryDto, final ProgressMonitor monitor) { 508 monitor.beginTask("Synchronising...", 100); 509 try { 510 final String path = repoFileDtoTreeNode.getPath(); 511 logger.info("syncDirectory: path='{}'", path); 512 try { 513 makeDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, directoryDto); 514 } catch (final CollisionException x) { 515 logger.info("CollisionException during makeDirectory: {}", path); 516 if (logger.isDebugEnabled()) 517 logger.debug(x.toString(), x); 518 519 return; 520 } 521 } finally { 522 monitor.done(); 523 } 524 } 525 526 protected void makeDirectory(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 527 final RepoFileDtoTreeNode repoFileDtoTreeNode, final String path, final DirectoryDto directoryDto) { 528 toRepoTransport.makeDirectory(path, directoryDto.getLastModified()); 529 } 530 531 private void syncSymlink( 532 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 533 final RepoFileDtoTreeNode repoFileDtoTreeNode, final SymlinkDto symlinkDto, final SubProgressMonitor monitor) { 534 monitor.beginTask("Synchronising...", 100); 535 try { 536 final String path = repoFileDtoTreeNode.getPath(); 537 try { 538 toRepoTransport.makeSymlink(path, symlinkDto.getTarget(), symlinkDto.getLastModified()); 539 } catch (final CollisionException x) { 540 logger.info("CollisionException during makeSymlink: {}", path); 541 if (logger.isDebugEnabled()) 542 logger.debug(x.toString(), x); 543 544 return; 545 } 546 } finally { 547 monitor.done(); 548 } 549 } 550 551 private void syncFile(final RepoTransport fromRepoTransport, 552 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 553 final RepoFileDto normalFileDto, final ProgressMonitor monitor) { 554 monitor.beginTask("Synchronising...", 100); 555 try { 556 final String path = repoFileDtoTreeNode.getPath(); 557 logger.info("syncFile: path='{}'", path); 558 559 final RepoFileDto fromRepoFileDto = fromRepoTransport.getRepoFileDto(path); 560 if (fromRepoFileDto == null) { 561 logger.warn("File was deleted during sync on source side: {}", path); 562 return; 563 } 564 if (!(fromRepoFileDto instanceof NormalFileDto)) { 565 logger.warn("Normal file was replaced by a directory (or another type) during sync on source side: {}", path); 566 return; 567 } 568 monitor.worked(10); 569 570 final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto; 571 572 final RepoFileDto toRepoFileDto = toRepoTransport.getRepoFileDto(path); 573 if (areFilesExistingAndEqual(fromRepoFileDto, toRepoFileDto)) { 574 logger.info("File is already equal on destination side (sha1='{}'): {}", fromNormalFileDto.getSha1(), path); 575 return; 576 } 577 monitor.worked(10); 578 579 logger.info("Beginning to copy file (from.sha1='{}' to.sha1='{}'): {}", fromNormalFileDto.getSha1(), 580 toRepoFileDto instanceof NormalFileDto ? ((NormalFileDto)toRepoFileDto).getSha1() : "<NoInstanceOf_NormalFileDto>", 581 path); 582 583 final NormalFileDto toNormalFileDto; 584 if (toRepoFileDto instanceof NormalFileDto) 585 toNormalFileDto = (NormalFileDto) toRepoFileDto; 586 else 587 toNormalFileDto = createObject(NormalFileDto.class); // dummy (null-object pattern) 588 589 try { 590 beginPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto); 591 } catch (final CollisionException x) { 592 logger.info("CollisionException during beginPutFile: {}", path); 593 if (logger.isDebugEnabled()) 594 logger.debug(x.toString(), x); 595 596 return; 597 } 598 localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, true); 599 monitor.worked(1); 600 601 final Map<Long, FileChunkDto> offset2ToTempFileChunkDto = new HashMap<>(toNormalFileDto.getTempFileChunkDtos().size()); 602 for (final FileChunkDto toTempFileChunkDto : toNormalFileDto.getTempFileChunkDtos()) 603 offset2ToTempFileChunkDto.put(toTempFileChunkDto.getOffset(), toTempFileChunkDto); 604 605 logger.debug("Comparing {} FileChunkDtos. path='{}'", fromNormalFileDto.getFileChunkDtos().size(), path); 606 final List<FileChunkDto> fromFileChunkDtosDirty = new ArrayList<FileChunkDto>(); 607 final Iterator<FileChunkDto> toFileChunkDtoIterator = toNormalFileDto.getFileChunkDtos().iterator(); 608 int fileChunkIndex = -1; 609 for (final FileChunkDto fromFileChunkDto : fromNormalFileDto.getFileChunkDtos()) { 610 final FileChunkDto toFileChunkDto = toFileChunkDtoIterator.hasNext() ? toFileChunkDtoIterator.next() : null; 611 ++fileChunkIndex; 612 final FileChunkDto toTempFileChunkDto = offset2ToTempFileChunkDto.get(fromFileChunkDto.getOffset()); 613 if (toTempFileChunkDto == null) { 614 if (toFileChunkDto != null 615 && equal(fromFileChunkDto.getOffset(), toFileChunkDto.getOffset()) 616 && equal(fromFileChunkDto.getLength(), toFileChunkDto.getLength()) 617 && equal(fromFileChunkDto.getSha1(), toFileChunkDto.getSha1())) { 618 if (logger.isTraceEnabled()) { 619 logger.trace("Skipping clean FileChunkDto. index={} offset={} sha1='{}'", 620 fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1()); 621 } 622 continue; 623 } 624 } 625 else { 626 if (equal(fromFileChunkDto.getOffset(), toTempFileChunkDto.getOffset()) 627 && equal(fromFileChunkDto.getLength(), toTempFileChunkDto.getLength()) 628 && equal(fromFileChunkDto.getSha1(), toTempFileChunkDto.getSha1())) { 629 if (logger.isTraceEnabled()) { 630 logger.trace("Skipping clean temporary FileChunkDto. index={} offset={} sha1='{}'", 631 fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1()); 632 } 633 continue; 634 } 635 } 636 637 if (logger.isTraceEnabled()) { 638 logger.trace("Enlisting dirty FileChunkDto. index={} fromOffset={} toOffset={} fromSha1='{}' toSha1='{}'", 639 fileChunkIndex, fromFileChunkDto.getOffset(), 640 (toFileChunkDto == null ? "null" : toFileChunkDto.getOffset()), 641 fromFileChunkDto.getSha1(), 642 (toFileChunkDto == null ? "null" : toFileChunkDto.getSha1())); 643 } 644 fromFileChunkDtosDirty.add(fromFileChunkDto); 645 } 646 647 logger.info("Need to copy {} dirty file-chunks (of {} total). path='{}'", 648 fromFileChunkDtosDirty.size(), fromNormalFileDto.getFileChunkDtos().size(), path); 649 650 final ProgressMonitor subMonitor = new SubProgressMonitor(monitor, 73); 651 subMonitor.beginTask("Synchronising...", fromFileChunkDtosDirty.size()); 652 fileChunkIndex = -1; 653 long bytesCopied = 0; 654 final long copyChunksBeginTimestamp = System.currentTimeMillis(); 655 for (final FileChunkDto fileChunkDto : fromFileChunkDtosDirty) { 656 ++fileChunkIndex; 657 if (logger.isTraceEnabled()) { 658 logger.trace("Reading data for dirty FileChunkDto (index {} of {}). path='{}' offset={}", 659 fileChunkIndex, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset()); 660 } 661 final byte[] fileData = getFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto); 662 663 if (fileData == null) { 664 logger.warn("Source file was modified or deleted during sync: {}", path); 665 // The file is left in state 'inProgress'. Thus it should definitely not be synced back in the opposite 666 // direction. The file should be synced again in the correct direction in the next run (after the source 667 // repo did a local sync, too). 668 return; 669 } 670 671 if (logger.isTraceEnabled()) { 672 logger.trace("Writing data for dirty FileChunkDto ({} of {}). path='{}' offset={}", 673 fileChunkIndex + 1, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset()); 674 } 675 676 try { 677 putFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto, fileData); 678 } catch (final CollisionException x) { // Never happens in CloudStore, but in down-stream-projects. Important: They must handle this properly themselves! 679 logger.info("CollisionException during putFileData: {}", path); 680 if (logger.isDebugEnabled()) 681 logger.debug(x.toString(), x); 682 683 return; 684 } 685 686 bytesCopied += fileData.length; 687 subMonitor.worked(1); 688 } 689 subMonitor.done(); 690 691 logger.info("Copied {} dirty file-chunks with together {} bytes in {} ms. path='{}'", 692 fromFileChunkDtosDirty.size(), bytesCopied, System.currentTimeMillis() - copyChunksBeginTimestamp, path); 693 694 endPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto); 695 localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, false); 696 monitor.worked(6); 697 } finally { 698 monitor.done(); 699 } 700 } 701 702 protected byte[] getFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 703 final RepoFileDtoTreeNode repoFileDtoTreeNode, 704 final String path, final FileChunkDto fileChunkDto) { 705 706 final byte[] fileData = fromRepoTransport.getFileData(path, fileChunkDto.getOffset(), fileChunkDto.getLength()); 707 if (fileData == null) 708 return null; // file was deleted 709 710 if (fileData.length != fileChunkDto.getLength() || !sha1(fileData).equals(fileChunkDto.getSha1())) 711 return null; // file was modified 712 713 return fileData; 714 } 715 716 protected void putFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 717 final RepoFileDtoTreeNode repoFileDtoTreeNode, 718 final String path, final FileChunkDto fileChunkDto, 719 final byte[] fileData) { 720 721 toRepoTransport.putFileData(path, fileChunkDto.getOffset(), fileData); 722 } 723 724 protected void beginPutFile(final RepoTransport fromRepoTransport, 725 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 726 final String path, final NormalFileDto fromNormalFileDto) throws CollisionException { 727 728 toRepoTransport.beginPutFile(path); 729 } 730 731 protected void endPutFile(final RepoTransport fromRepoTransport, 732 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 733 final String path, final NormalFileDto fromNormalFileDto) { 734 735 toRepoTransport.endPutFile( 736 path, fromNormalFileDto.getLastModified(), 737 fromNormalFileDto.getLength(), fromNormalFileDto.getSha1()); 738 } 739 740 private boolean areFilesExistingAndEqual(final RepoFileDto fromRepoFileDto, final RepoFileDto toRepoFileDto) { 741 if (!(fromRepoFileDto instanceof NormalFileDto)) 742 return false; 743 744 if (!(toRepoFileDto instanceof NormalFileDto)) 745 return false; 746 747 final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto; 748 final NormalFileDto toNormalFileDto = (NormalFileDto) toRepoFileDto; 749 750 return equal(fromNormalFileDto.getLength(), toNormalFileDto.getLength()) 751 && equal(fromNormalFileDto.getLastModified(), toNormalFileDto.getLastModified()) 752 && equal(fromNormalFileDto.getSha1(), toNormalFileDto.getSha1()); 753 } 754 755 @Override 756 public void close() { 757 localRepoManager.close(); 758 localRepoTransport.close(); 759 remoteRepoTransport.close(); 760 } 761}