using System.Text; using System.Threading.Channels; using WarcViewerBlazorWinForm.Backend.IO; using WarcViewerBlazorWinForm.Backend.IO.Archiving; using WarcViewerBlazorWinForm.Library.AssemblyInject.Interfaces; using WarcViewerBlazorWinForm.Library.EventBus; namespace WarcViewerBlazorWinForm.Backend.Warc; internal class WarcIndexManager : IAssemblyInjectSingleton, IAssemblyInjectSyncStarStop { private const long Gib = 1024 * 1024 * 1024; private static class FolderLayer { // normal // .idx/[archive key/] // .idx/[archive key/] // fallback // /fallback index//[archive key/] // file name // 0.index.xz public static string MakeIndexDirOrFallback(FileDescriptor fd) { var inputFileFullPath = Path.GetFullPath(fd.FilePath); var dir = inputFileFullPath + ".idx"; if (fd.IsReadDirectly == false && fd.ArchiveEntryKey != null) dir = Path.Combine(dir, fd.ArchiveEntryKey.Trim().TrimStart('/', '\\')); if (Directory.Exists(dir)) return dir; try { Directory.CreateDirectory(dir); return dir; } catch (IOException) { var fallbackDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "fallback index", Path.GetFileNameWithoutExtension(fd.FilePath), fd.ArchiveEntryKey ?? ""); Directory.CreateDirectory(fallbackDir); return fallbackDir; } } public static string MakeIndexFileName(int gbIndex) => $"{gbIndex:00000}.index.xz"; } private class IndexFormat { // int32 - NumberOfEntry // int32 - LastEntrySize for check index integrity // int64* - BLOB entry offset // LINES* - LINES: public List Offsets { get; } public int? LastEntrySize { get; set; } public List<(string url, string type)> Urls { get; } public IndexFormat() { Offsets = []; Urls = []; } public IndexFormat(Stream fromStream) { if (fromStream == null) throw new ArgumentNullException(nameof(fromStream)); var br = new BinaryReader(fromStream); var sr = new StreamReader(fromStream); var numberOfEntry = br.ReadInt32(); LastEntrySize = br.ReadInt32(); Offsets = new List(numberOfEntry); Urls = new List<(string, string)>(numberOfEntry); for (var i = 0; i < numberOfEntry; i++) Offsets.Add(br.ReadInt64()); for (var i = 0; i < numberOfEntry; i++) { var line = sr.ReadLine(); var parts = line!.Split(' ', 2); Urls.Add((parts[0], parts[1])); } } public void WriteToStream(Stream stream) { if (LastEntrySize.HasValue == false) throw new InvalidOperationException("LastEntrySize can not be null"); var count = Offsets.Count; if (count != Urls.Count) throw new InvalidOperationException("count not matched between Offsets and Urls"); var ms = new MemoryStream(); using var bw = new BinaryWriter(ms, Encoding.ASCII, true); using var sw = new StreamWriter(ms, leaveOpen: true); bw.Write(count); bw.Write(LastEntrySize.Value); foreach (var offset in Offsets) bw.Write(offset); foreach (var tuple in Urls) sw.WriteLine($"{tuple.type} {tuple.url}"); sw.Close(); bw.Close(); ms.Position = 0; ms.CopyTo(stream); } } private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly Channel _queue = Channel.CreateBounded(int.MaxValue); private readonly IEventBus _eventBus; private bool _isWorking = false; private FileDescriptor? _currentFileDescriptor; private float? _currentProgress; public WarcIndexManager(IEventBus eventBus) { _eventBus = eventBus; _eventBus.Subscript(obj => _ = LoadIndexAsync(obj.FileDescriptor)); _eventBus.Subscript(_ => PublishStatus()); } private void PublishStatus() => _eventBus.Publish(new WarcIndexerStatusEvent(_isWorking, _queue.Reader.Count, _currentFileDescriptor, _currentProgress)); private async Task LoadIndexAsync(FileDescriptor fd) { var dir = FolderLayer.MakeIndexDirOrFallback(fd); var len = await FileReader.GetLengthAsync(fd); var gbIndex = 0; do { var indexFilePath = Path.Combine(dir, FolderLayer.MakeIndexFileName(gbIndex)); if (File.Exists(indexFilePath) == false) { await _queue.Writer.WriteAsync(fd); return; } var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(indexFilePath)); var batch = new List(); for (var i = 0; i < indexContent.Offsets.Count; i++) { var (url, type) = indexContent.Urls[i]; batch.Add(new WarcIndexEntry(fd, indexContent.Offsets[i], type, url)); } _eventBus.Publish(new WarcIndexerLoadedEvent(batch)); if (indexContent.Offsets.Last() + indexContent.LastEntrySize == len) { return; } gbIndex++; } while (true); } public void Start() { Task.Run(async () => { while (_cancellationTokenSource.IsCancellationRequested == false) { _isWorking = false; _currentFileDescriptor = null; _currentProgress = null; PublishStatus(); var fileDescriptor = await _queue.Reader.ReadAsync(_cancellationTokenSource.Token); _isWorking = true; _currentFileDescriptor = fileDescriptor; PublishStatus(); var length = await FileReader.GetLengthAsync(fileDescriptor); var indexDir = FolderLayer.MakeIndexDirOrFallback(fileDescriptor); var existIndexFile = Directory.GetFiles(indexDir).OrderBy(p => p).ToArray(); long loadPosition = 0; var lastExistIndexFile = existIndexFile.LastOrDefault(); if (lastExistIndexFile != null) { var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(lastExistIndexFile)); loadPosition = indexContent.Offsets.Last() + indexContent.LastEntrySize!.Value; if (loadPosition == length) continue; // skip completely indexed } // Start(continue) indexing await using var stream = await FileReader.OpenReadStreamAsync(fileDescriptor); stream.SeekForwardStupid(loadPosition); _currentProgress = (float)loadPosition / length; PublishStatus(); IndexFormat? indexingFile = null; var indexingGb = (int)(loadPosition / Gib); int? lastBlockSize; do { if (_cancellationTokenSource.IsCancellationRequested) return; if (indexingFile == null) indexingFile = new(); //handle index entry indexingFile.Offsets.Add(loadPosition); var block = WarcParser.ReadEntryBlock(stream); lastBlockSize = block.Length; loadPosition += block.Length; _eventBus.Publish(new WarcIndexerStatusEvent(true, _queue.Reader.Count, fileDescriptor, (float)loadPosition / length)); var entry = WarcParser.ParseBlockForIndexing(block); indexingFile.Urls.Add((entry.Url, entry.Type)); //check per gib split and flip var nowGb = (int)(loadPosition / Gib); if (nowGb != indexingGb) { indexingFile.LastEntrySize = block.Length; await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb))); indexingFile.WriteToStream(xzWriteStream); indexingFile = null; indexingGb = nowGb; } } while (loadPosition < length); if (indexingFile != null) //write last part of index { indexingFile.LastEntrySize = lastBlockSize.Value; await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb))); indexingFile.WriteToStream(xzWriteStream); } } }); } public void Stop() => _cancellationTokenSource.Cancel(); public enum IndexingState { NotIndexed, PartIndexed, CompletelyIndexed } }