WarcIndexManager.cs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. using System.Text;
  2. using System.Threading.Channels;
  3. using WarcViewerBlazorWinForm.Backend.IO;
  4. using WarcViewerBlazorWinForm.Backend.IO.Archiving;
  5. using WarcViewerBlazorWinForm.Library.AssemblyInject.Interfaces;
  6. using WarcViewerBlazorWinForm.Library.EventBus;
  7. namespace WarcViewerBlazorWinForm.Backend.Warc;
  8. internal class WarcIndexManager : IAssemblyInjectSingleton, IAssemblyInjectSyncStarStop
  9. {
  10. private const long Gib = 1024 * 1024 * 1024;
  11. private static class FolderLayer
  12. {
  13. // normal
  14. // <input file>.idx/[archive key/]
  15. // <input file>.idx/[archive key/]
  16. // fallback
  17. // <app path>/fallback index/<input file>/[archive key/]
  18. // file name
  19. // 0.index.xz
  20. public static string MakeIndexDirOrFallback(FileDescriptor fd)
  21. {
  22. var inputFileFullPath = Path.GetFullPath(fd.FilePath);
  23. var dir = inputFileFullPath + ".idx";
  24. if (fd.IsReadDirectly == false && fd.ArchiveEntryKey != null) dir = Path.Combine(dir, fd.ArchiveEntryKey.Trim().TrimStart('/', '\\'));
  25. if (Directory.Exists(dir)) return dir;
  26. try
  27. {
  28. Directory.CreateDirectory(dir);
  29. return dir;
  30. }
  31. catch (IOException)
  32. {
  33. var fallbackDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "fallback index", Path.GetFileNameWithoutExtension(fd.FilePath), fd.ArchiveEntryKey ?? "");
  34. Directory.CreateDirectory(fallbackDir);
  35. return fallbackDir;
  36. }
  37. }
  38. public static string MakeIndexFileName(int gbIndex) => $"{gbIndex:00000}.index.xz";
  39. }
  40. private class IndexFormat
  41. {
  42. // int32 - NumberOfEntry
  43. // int32 - LastEntrySize for check index integrity
  44. // int64* - BLOB entry offset
  45. // LINES* - LINES: <URL> <TYPE:M, S, R, O>
  46. public List<long> Offsets { get; }
  47. public int? LastEntrySize { get; set; }
  48. public List<(string url, string type)> Urls { get; }
  49. public IndexFormat()
  50. {
  51. Offsets = [];
  52. Urls = [];
  53. }
  54. public IndexFormat(Stream fromStream)
  55. {
  56. if (fromStream == null) throw new ArgumentNullException(nameof(fromStream));
  57. var br = new BinaryReader(fromStream);
  58. var sr = new StreamReader(fromStream);
  59. var numberOfEntry = br.ReadInt32();
  60. LastEntrySize = br.ReadInt32();
  61. Offsets = new List<long>(numberOfEntry);
  62. Urls = new List<(string, string)>(numberOfEntry);
  63. for (var i = 0; i < numberOfEntry; i++) Offsets.Add(br.ReadInt64());
  64. for (var i = 0; i < numberOfEntry; i++)
  65. {
  66. var line = sr.ReadLine();
  67. var parts = line!.Split(' ', 2);
  68. Urls.Add((parts[0], parts[1]));
  69. }
  70. }
  71. public void WriteToStream(Stream stream)
  72. {
  73. if (LastEntrySize.HasValue == false) throw new InvalidOperationException("LastEntrySize can not be null");
  74. var count = Offsets.Count;
  75. if (count != Urls.Count) throw new InvalidOperationException("count not matched between Offsets and Urls");
  76. var ms = new MemoryStream();
  77. using var bw = new BinaryWriter(ms, Encoding.ASCII, true);
  78. using var sw = new StreamWriter(ms, leaveOpen: true);
  79. bw.Write(count);
  80. bw.Write(LastEntrySize.Value);
  81. foreach (var offset in Offsets) bw.Write(offset);
  82. foreach (var tuple in Urls) sw.WriteLine($"{tuple.type} {tuple.url}");
  83. sw.Close();
  84. bw.Close();
  85. ms.Position = 0;
  86. ms.CopyTo(stream);
  87. }
  88. }
  89. private readonly CancellationTokenSource _cancellationTokenSource = new();
  90. private readonly Channel<FileDescriptor> _queue = Channel.CreateBounded<FileDescriptor>(int.MaxValue);
  91. private readonly IEventBus _eventBus;
  92. private bool _isWorking = false;
  93. private FileDescriptor? _currentFileDescriptor;
  94. private float? _currentProgress;
  95. public WarcIndexManager(IEventBus eventBus)
  96. {
  97. _eventBus = eventBus;
  98. _eventBus.Subscript<WarcIndexerLoadRequestEvent>(obj => _ = LoadIndexAsync(obj.FileDescriptor));
  99. _eventBus.Subscript<WarcIndexerStatusRequestEvent>(_ => PublishStatus());
  100. }
  101. private void PublishStatus() => _eventBus.Publish(new WarcIndexerStatusEvent(_isWorking, _queue.Reader.Count, _currentFileDescriptor, _currentProgress));
  102. private async Task LoadIndexAsync(FileDescriptor fd)
  103. {
  104. var dir = FolderLayer.MakeIndexDirOrFallback(fd);
  105. var len = await FileReader.GetLengthAsync(fd);
  106. var gbIndex = 0;
  107. do
  108. {
  109. var indexFilePath = Path.Combine(dir, FolderLayer.MakeIndexFileName(gbIndex));
  110. if (File.Exists(indexFilePath) == false)
  111. {
  112. await _queue.Writer.WriteAsync(fd);
  113. return;
  114. }
  115. var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(indexFilePath));
  116. var batch = new List<WarcIndexEntry>();
  117. for (var i = 0; i < indexContent.Offsets.Count; i++)
  118. {
  119. var (url, type) = indexContent.Urls[i];
  120. batch.Add(new WarcIndexEntry(fd, indexContent.Offsets[i], type, url));
  121. }
  122. _eventBus.Publish(new WarcIndexerLoadedEvent(batch));
  123. if (indexContent.Offsets.Last() + indexContent.LastEntrySize == len)
  124. {
  125. return;
  126. }
  127. gbIndex++;
  128. } while (true);
  129. }
  130. public void Start()
  131. {
  132. Task.Run(async () =>
  133. {
  134. while (_cancellationTokenSource.IsCancellationRequested == false)
  135. {
  136. _isWorking = false;
  137. _currentFileDescriptor = null;
  138. _currentProgress = null;
  139. PublishStatus();
  140. var fileDescriptor = await _queue.Reader.ReadAsync(_cancellationTokenSource.Token);
  141. _isWorking = true;
  142. _currentFileDescriptor = fileDescriptor;
  143. PublishStatus();
  144. var length = await FileReader.GetLengthAsync(fileDescriptor);
  145. var indexDir = FolderLayer.MakeIndexDirOrFallback(fileDescriptor);
  146. var existIndexFile = Directory.GetFiles(indexDir).OrderBy(p => p).ToArray();
  147. long loadPosition = 0;
  148. var lastExistIndexFile = existIndexFile.LastOrDefault();
  149. if (lastExistIndexFile != null)
  150. {
  151. var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(lastExistIndexFile));
  152. loadPosition = indexContent.Offsets.Last() + indexContent.LastEntrySize!.Value;
  153. if (loadPosition == length) continue; // skip completely indexed
  154. }
  155. // Start(continue) indexing
  156. await using var stream = await FileReader.OpenReadStreamAsync(fileDescriptor);
  157. stream.SeekForwardStupid(loadPosition);
  158. _currentProgress = (float)loadPosition / length;
  159. PublishStatus();
  160. IndexFormat? indexingFile = null;
  161. var indexingGb = (int)(loadPosition / Gib);
  162. int? lastBlockSize;
  163. do
  164. {
  165. if (_cancellationTokenSource.IsCancellationRequested) return;
  166. if (indexingFile == null) indexingFile = new();
  167. //handle index entry
  168. indexingFile.Offsets.Add(loadPosition);
  169. var block = WarcParser.ReadEntryBlock(stream);
  170. lastBlockSize = block.Length;
  171. loadPosition += block.Length;
  172. _eventBus.Publish(new WarcIndexerStatusEvent(true, _queue.Reader.Count, fileDescriptor, (float)loadPosition / length));
  173. var entry = WarcParser.ParseBlockForIndexing(block);
  174. indexingFile.Urls.Add((entry.Url, entry.Type));
  175. //check per gib split and flip
  176. var nowGb = (int)(loadPosition / Gib);
  177. if (nowGb != indexingGb)
  178. {
  179. indexingFile.LastEntrySize = block.Length;
  180. await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
  181. indexingFile.WriteToStream(xzWriteStream);
  182. indexingFile = null;
  183. indexingGb = nowGb;
  184. }
  185. } while (loadPosition < length);
  186. if (indexingFile != null) //write last part of index
  187. {
  188. indexingFile.LastEntrySize = lastBlockSize.Value;
  189. await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
  190. indexingFile.WriteToStream(xzWriteStream);
  191. }
  192. }
  193. });
  194. }
  195. public void Stop() => _cancellationTokenSource.Cancel();
  196. public enum IndexingState
  197. {
  198. NotIndexed,
  199. PartIndexed,
  200. CompletelyIndexed
  201. }
  202. }