java调用Rsync并发迁移数据并执行校验
java代码如下
RsyncFile.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import lombok.NoArgsConstructor; import lombok.SneakyThrows;
import java.io.*; import java.util.ArrayList; import java.util.Date; import java.util.concurrent.*;
/** * @ClassName RsyncFile * @Descriptiom TODO rsync多线程同步迁移数据 * @Author KING * @Date 2019/11/25 09:17 * @Version 1.2.2 * rsync -vzrtopg --progress --delete //镜像同步 **/ @NoArgsConstructor public class RsyncFile implements Runnable{ private static final int availProcessors = Runtime.getRuntime().availableProcessors(); //构造以cpu核心数为核心池,cpu线程数为最大池,超时时间为1s,线程队列为大小为无界的安全阻塞线程队列,拒绝策略为DiscardOldestPolicy()的线程池。(同步数据当然不能丢下拒绝任务) private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1 , availProcessors, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
//保存扫描得到的文件列表 private static ArrayList<String> fileNameList = new ArrayList<String>(); private String shellname; private String filename; private String userip; private CountDownLatch countDownLatch; private static int deep = 0 ;
public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) { this .shellname = ShellName; this .filename = filename; this .userip = UserIP; this .countDownLatch = countDownLatch; }
public static void main(String[] args) { try { new RsyncFile().Do(args[ 0 ],args[ 1 ],Integer.parseInt(args[ 2 ])); } catch (ArrayIndexOutOfBoundsException e){ System.out.println(e); System.out.println( "Error , args send fault" ); System.out.println( "please send localAddress remote username @ remote IP or hostname and catalogue" ); System.out.println( "like this [ /home/test/ root@node1:/test/ 1 ]" ); } catch (NumberFormatException e1){ System.out.println(e1); System.out.println( "please input Right Directory depth, this number must be int" ); System.out.println( "like this [ /home/test/ root@node1:/test/ 1 ]" ); } }
@SneakyThrows private void Do(String content,String UserIP, int setdeep){ System.out.println( "开始执行" ); System.out.println( "开始时间:" + new Date()); Long a = System.nanoTime(); File file = new File(content); System.out.println( "开始扫描本地指定目录" ); GetAllFile(file,setdeep); //按深度扫描非空文件夹和文件 System.out.println( "扫描本地目录完成" );
//给脚本赋予权限 String [] cmd={ "/bin/sh" , "-c" , "chmod 755 ./do*" }; Runtime.getRuntime().exec(cmd); //创建远端目录操作 System.out.println( "开始创建远端目录结构" ); //一次计数锁用于保证目录创建完成 CountDownLatch doDirLock = new CountDownLatch( 1 ); ThreadPool.execute( new RsyncFile( "./doDirc.sh" ,content,UserIP,doDirLock)); doDirLock.await(); System.out.println( "创建远端目录结构完成" );
//开始同步工作 System.out.println( "开始执行同步工作" ); System.out.println( "同步的文件夹或文件总数: " + fileNameList.size()); System.out.println( "正在同步。。。。。" ); //fileNameList.size()次计数锁用于保证数据同步完成(保证计时准确) CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size()); System.out.println(fileNameList.size()); for (String fileName:fileNameList) { //除去文件名中与UserIP重复的文件路径 String RemoteDir = UserIP.concat(fileName.replace(content, "" )); System.out.println( "要同步的本地目录或文件: " + fileName); System.out.println( "要同步的远端目录或文件: " + RemoteDir); ThreadPool.execute( new RsyncFile( "./doRync.sh" ,fileName, RemoteDir,rsyncLock)); } rsyncLock.await(); System.out.println( "执行同步工作完成" );
//开始文件校验工作 System.out.println( "执行文件校验及重传" ); //一次计数锁用于保证校验完成 CountDownLatch chechSumLock = new CountDownLatch( 1 ); ThreadPool.execute( new RsyncFile( "./doChecksum.sh" ,content,UserIP,chechSumLock)); chechSumLock.await(); System.out.println( "文件校验及重传完成" ); ThreadPool.shutdown();
Long b = System.nanoTime(); Long aLong = (b - a)/1000000L; System.out.println( "处理时间" + aLong + "ms" ); System.out.println( "结束时间:" + new Date());
}
/** * 执行rsync脚本的线程方法,使用PrintWriter来与linux Terminal交互 */ @Override public void run() { try { String command=shellname.concat( " " ).concat(filename).concat( " " ).concat(userip); File wd = new File( "/bin" ); Process process = null ; process = Runtime.getRuntime().exec( "/bin/bash" , null , wd); if (process != null ) { InputStream is = process.getInputStream(); BufferedReader reader = new BufferedReader( new InputStreamReader(is)); PrintWriter out = new PrintWriter( new BufferedWriter( new OutputStreamWriter(process.getOutputStream())), true ); //切换到当前class文件所在目录 out.println( "cd " + System.getProperty( "user.dir" )); out.println(command); out.println( "exit" ); StringBuilder sb = new StringBuilder(); String line; while ((line = reader.readLine()) != null ) { sb.append(line + System.lineSeparator()); } process.waitFor(); reader.close(); out.close(); process.destroy(); System.out.println( "result:" + sb.toString()); } else { System.out.println( "找不到系统bash工具,请检查系统是否异常,并为系统创建/bin/sh的bash工具软连接" ); } } catch (Exception e) { System.err.println(e.getMessage()); } finally { //倒记数锁释放一次 countDownLatch.countDown(); } }
/**遍历指定的目录并能指定深度 * @param file 指定要遍历的目录 * @param setDeep 设定遍历深度 */ @SneakyThrows private static void GetAllFile(File file, int setDeep) { if (file != null ){ if (file.isDirectory() && deep<setDeep){ deep++; File f[] = file.listFiles(); if (f != null ) { int length = f.length; for ( int i = 0 ; i < length; i++) GetAllFile(f[i],setDeep); deep--; } } else { if (file.isDirectory()){ //如果为目录末尾添加 / 保证rsync正常处理 fileNameList.add(file.getAbsolutePath().concat( "/" )); } else { fileNameList.add(file.getAbsolutePath()); } } } }
} |
doDir.sh
1 2 |
rsync -av --include= '*/' --exclude= '*' $1 $2 | tee -a /tmp/rsync .log 2>&1 echo "创建目录结构操作" |
doRsync.sh
1 |
rsync -avzi --stats --progress $1 $2 | tee -a /tmp/rsync .log 2>&1 |
doChecksum.sh
1 |
rsync -acvzi --stats --progress $1 $2 | tee -a /tmp/checksum .log 2>&1 |
附录
rsync输出日志说明如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
YXcstpoguax path/to/file ||||||||||| ||||||||||╰- x: The extended attribute information changed |||||||||╰-- a: The ACL information changed ||||||||╰--- u: The u slot is reserved for future use |||||||╰---- g: Group is different ||||||╰----- o: Owner is different |||||╰------ p: Permission are different ||||╰------- t: Modification time is different |||╰-------- s: Size is different ||╰--------- c: Different checksum (for regular files), or || changed value (for symlinks, devices, and special files) |╰---------- the file type: | f: for a file, | d: for a directory, | L: for a symlink, | D: for a device, | S: for a special file (e.g. named sockets and fifos) ╰----------- the type of update being done:: <: file is being transferred to the remote host (sent) >: file is being transferred to the local host (received) c: local change/creation for the item, such as: - the creation of a directory - the changing of a symlink, - etc. h: the item is a hard link to another item (requires --hard-links). .: the item is not being updated (though it might have attributes that are being modified) *: means that the rest of the itemized-output area contains a message (e.g. "deleting") |
到此这篇关于Java之Rsync并发迁移数据并校验详解的文章就介绍到这了,更多相关Java之Rsync并发内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://blog.csdn.net/w4187402/article/details/103274618
查看更多关于Java之Rsync并发迁移数据并校验详解的详细内容...