WarcIndexManager.cs 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. using System.Text;
  2. using System.Threading.Channels;
  3. using WarcViewerBlazorWinForm.Backend.IO;
  4. using WarcViewerBlazorWinForm.Backend.IO.Archiving;
  5. using WarcViewerBlazorWinForm.Library.AssemblyInject.Interfaces;
  6. using WarcViewerBlazorWinForm.Library.EventBus;
  7. namespace WarcViewerBlazorWinForm.Backend.Warc;
  8. internal class WarcIndexManager : IAssemblyInjectSingleton, IAssemblyInjectSyncStarStop
  9. {
  10. private const long Gib = 1024 * 1024 * 1024;
  11. private static class FolderLayer
  12. {
  13. // normal
  14. // <input file>.idx/[archive key/]
  15. // <input file>.idx/[archive key/]
  16. // fallback
  17. // <app path>/fallback index/<input file>/[archive key/]
  18. // file name
  19. // 0.index.xz
  20. public static string MakeIndexDirOrFallback(FileDescriptor fd)
  21. {
  22. var inputFileFullPath = Path.GetFullPath(fd.FilePath);
  23. var dir = inputFileFullPath + ".idx";
  24. if (fd.IsReadDirectly == false && fd.ArchiveEntryKey != null) dir = Path.Combine(dir, fd.ArchiveEntryKey.Trim().TrimStart('/', '\\'));
  25. if (Directory.Exists(dir)) return dir;
  26. try
  27. {
  28. Directory.CreateDirectory(dir);
  29. return dir;
  30. }
  31. catch (IOException)
  32. {
  33. var fallbackDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "fallback index", Path.GetFileNameWithoutExtension(fd.FilePath), fd.ArchiveEntryKey ?? "");
  34. Directory.CreateDirectory(fallbackDir);
  35. return fallbackDir;
  36. }
  37. }
  38. public static string MakeIndexFileName(int gbIndex) => $"{gbIndex:00000}.index.xz";
  39. }
  40. private class IndexFormat
  41. {
  42. // int32 - NumberOfEntry
  43. // int32 - LastEntrySize for check index integrity
  44. // int64* - BLOB entry offset
  45. // LINES* - LINES: <URL> <TYPE:M, S, R, O>
  46. public List<long> Offsets { get; }
  47. public int? LastEntrySize { get; set; }
  48. public List<(string url, string type)> Urls { get; }
  49. public IndexFormat()
  50. {
  51. Offsets = [];
  52. Urls = [];
  53. }
  54. public IndexFormat(Stream fromStream)
  55. {
  56. if (fromStream == null) throw new ArgumentNullException(nameof(fromStream));
  57. var br = new BinaryReader(fromStream);
  58. var sr = new StreamReader(fromStream);
  59. var numberOfEntry = br.ReadInt32();
  60. LastEntrySize = br.ReadInt32();
  61. Offsets = new List<long>(numberOfEntry);
  62. Urls = new List<(string, string)>(numberOfEntry);
  63. for (var i = 0; i < numberOfEntry; i++) Offsets.Add(br.ReadInt64());
  64. for (var i = 0; i < numberOfEntry; i++)
  65. {
  66. var line = sr.ReadLine();
  67. var parts = line!.Split(' ', 2);
  68. Urls.Add((parts[0], parts[1]));
  69. }
  70. }
  71. public void WriteToStream(Stream stream)
  72. {
  73. if (LastEntrySize.HasValue == false) throw new InvalidOperationException("LastEntrySize can not be null");
  74. var count = Offsets.Count;
  75. if (count != Urls.Count) throw new InvalidOperationException("count not matched between Offsets and Urls");
  76. var ms = new MemoryStream();
  77. using var bw = new BinaryWriter(ms, Encoding.ASCII, true);
  78. using var sw = new StreamWriter(ms, leaveOpen: true);
  79. bw.Write(count);
  80. bw.Write(LastEntrySize.Value);
  81. foreach (var offset in Offsets) bw.Write(offset);
  82. foreach (var tuple in Urls) sw.WriteLine($"{tuple.type} {tuple.url}");
  83. sw.Close();
  84. bw.Close();
  85. ms.Position = 0;
  86. ms.CopyTo(stream);
  87. }
  88. }
  89. private readonly CancellationTokenSource _cancellationTokenSource = new();
  90. private readonly Channel<FileDescriptor> _queue = Channel.CreateBounded<FileDescriptor>(int.MaxValue);
  91. private readonly IEventBus _eventBus;
  92. private bool _isWorking = false;
  93. private FileDescriptor? _currentFileDescriptor;
  94. private float? _currentProgress;
  95. private readonly SortedList<string, WarcIndexEntry> _indexedEntries = new();
  96. public WarcIndexManager(IEventBus eventBus)
  97. {
  98. _eventBus = eventBus;
  99. _eventBus.Subscript<WarcIndexerLoadRequestEvent>(obj => _ = LoadIndexAsync(obj.FileDescriptor));
  100. _eventBus.Subscript<WarcIndexerStatusRequestEvent>(_ => PublishStatus());
  101. _eventBus.Subscript<WarcIndexerQueryRequestEvent>(DoQuery);
  102. }
  103. private void PublishStatus() => _eventBus.Publish(new WarcIndexerStatusEvent(_isWorking, _queue.Reader.Count, _currentFileDescriptor, _currentProgress));
  104. private async Task LoadIndexAsync(FileDescriptor fd)
  105. {
  106. var dir = FolderLayer.MakeIndexDirOrFallback(fd);
  107. var len = await FileReader.GetLengthAsync(fd);
  108. var gbIndex = 0;
  109. do
  110. {
  111. var indexFilePath = Path.Combine(dir, FolderLayer.MakeIndexFileName(gbIndex));
  112. if (File.Exists(indexFilePath) == false)
  113. {
  114. await _queue.Writer.WriteAsync(fd);
  115. return;
  116. }
  117. var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(indexFilePath));
  118. var batch = new List<WarcIndexEntry>();
  119. for (var i = 0; i < indexContent.Offsets.Count; i++)
  120. {
  121. var (url, type) = indexContent.Urls[i];
  122. batch.Add(new WarcIndexEntry(fd, indexContent.Offsets[i], type, url));
  123. }
  124. lock (_indexedEntries)
  125. {
  126. foreach (var item in batch)
  127. {
  128. _indexedEntries.Add(item.Url, item);
  129. }
  130. }
  131. if (indexContent.Offsets.Last() + indexContent.LastEntrySize == len)
  132. {
  133. return;
  134. }
  135. gbIndex++;
  136. } while (true);
  137. }
  138. private void DoQuery(WarcIndexerQueryRequestEvent obj)
  139. {
  140. lock (_indexedEntries)
  141. {
  142. IEnumerable<WarcIndexEntry> source = _indexedEntries.Values;
  143. if (obj.Filter != null) source = source.Where(obj.Filter);
  144. var resultSet = source.Skip(obj.PageSize * (obj.PageNumber - 1)).Take(obj.PageSize).ToArray();
  145. var result = new WarcIndexerQueryResultEvent(obj.EventId, resultSet, _indexedEntries.Count);
  146. _eventBus.Publish(result);
  147. }
  148. }
  149. public void Start()
  150. {
  151. Task.Run(async () =>
  152. {
  153. while (_cancellationTokenSource.IsCancellationRequested == false)
  154. {
  155. _isWorking = false;
  156. _currentFileDescriptor = null;
  157. _currentProgress = null;
  158. PublishStatus();
  159. var fileDescriptor = await _queue.Reader.ReadAsync(_cancellationTokenSource.Token);
  160. _isWorking = true;
  161. _currentFileDescriptor = fileDescriptor;
  162. PublishStatus();
  163. var length = await FileReader.GetLengthAsync(fileDescriptor);
  164. var indexDir = FolderLayer.MakeIndexDirOrFallback(fileDescriptor);
  165. var existIndexFile = Directory.GetFiles(indexDir).OrderBy(p => p).ToArray();
  166. long loadPosition = 0;
  167. var lastExistIndexFile = existIndexFile.LastOrDefault();
  168. if (lastExistIndexFile != null)
  169. {
  170. var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(lastExistIndexFile));
  171. loadPosition = indexContent.Offsets.Last() + indexContent.LastEntrySize!.Value;
  172. if (loadPosition == length) continue; // skip completely indexed
  173. }
  174. // Start(continue) indexing
  175. await using var stream = await FileReader.OpenReadStreamAsync(fileDescriptor);
  176. stream.SeekForwardStupid(loadPosition);
  177. _currentProgress = (float)loadPosition / length;
  178. PublishStatus();
  179. IndexFormat? indexingFile = null;
  180. var indexingGb = (int)(loadPosition / Gib);
  181. int? lastBlockSize;
  182. do
  183. {
  184. if (_cancellationTokenSource.IsCancellationRequested) return;
  185. if (indexingFile == null) indexingFile = new();
  186. //handle index entry
  187. indexingFile.Offsets.Add(loadPosition);
  188. var block = WarcParser.ReadEntryBlock(stream);
  189. lastBlockSize = block.Length;
  190. loadPosition += block.Length;
  191. _eventBus.Publish(new WarcIndexerStatusEvent(true, _queue.Reader.Count, fileDescriptor, (float)loadPosition / length));
  192. var entry = WarcParser.ParseBlockForIndexing(block);
  193. indexingFile.Urls.Add((entry.Url, entry.Type));
  194. //check per gib split and flip
  195. var nowGb = (int)(loadPosition / Gib);
  196. if (nowGb != indexingGb)
  197. {
  198. indexingFile.LastEntrySize = block.Length;
  199. await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
  200. indexingFile.WriteToStream(xzWriteStream);
  201. indexingFile = null;
  202. indexingGb = nowGb;
  203. }
  204. } while (loadPosition < length);
  205. if (indexingFile != null) //write last part of index
  206. {
  207. indexingFile.LastEntrySize = lastBlockSize.Value;
  208. await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
  209. indexingFile.WriteToStream(xzWriteStream);
  210. }
  211. }
  212. });
  213. }
  214. public void Stop() => _cancellationTokenSource.Cancel();
  215. }