123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- using System.Text;
- using System.Threading.Channels;
- using WarcViewerBlazorWinForm.Backend.IO;
- using WarcViewerBlazorWinForm.Backend.IO.Archiving;
- using WarcViewerBlazorWinForm.Library.AssemblyInject.Interfaces;
- using WarcViewerBlazorWinForm.Library.EventBus;
- namespace WarcViewerBlazorWinForm.Backend.Warc;
- internal class WarcIndexManager : IAssemblyInjectSingleton, IAssemblyInjectSyncStarStop
- {
- private const long Gib = 1024 * 1024 * 1024;
- private static class FolderLayer
- {
- // normal
- // <input file>.idx/[archive key/]
- // <input file>.idx/[archive key/]
- // fallback
- // <app path>/fallback index/<input file>/[archive key/]
- // file name
- // 0.index.xz
- public static string MakeIndexDirOrFallback(FileDescriptor fd)
- {
- var inputFileFullPath = Path.GetFullPath(fd.FilePath);
- var dir = inputFileFullPath + ".idx";
- if (fd.IsReadDirectly == false && fd.ArchiveEntryKey != null) dir = Path.Combine(dir, fd.ArchiveEntryKey.Trim().TrimStart('/', '\\'));
- if (Directory.Exists(dir)) return dir;
- try
- {
- Directory.CreateDirectory(dir);
- return dir;
- }
- catch (IOException)
- {
- var fallbackDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "fallback index", Path.GetFileNameWithoutExtension(fd.FilePath), fd.ArchiveEntryKey ?? "");
- Directory.CreateDirectory(fallbackDir);
- return fallbackDir;
- }
- }
- public static string MakeIndexFileName(int gbIndex) => $"{gbIndex:00000}.index.xz";
- }
- private class IndexFormat
- {
- // int32 - NumberOfEntry
- // int32 - LastEntrySize for check index integrity
- // int64* - BLOB entry offset
- // LINES* - LINES: <URL> <TYPE:M, S, R, O>
- public List<long> Offsets { get; }
- public int? LastEntrySize { get; set; }
- public List<(string url, string type)> Urls { get; }
- public IndexFormat()
- {
- Offsets = [];
- Urls = [];
- }
- public IndexFormat(Stream fromStream)
- {
- if (fromStream == null) throw new ArgumentNullException(nameof(fromStream));
- var br = new BinaryReader(fromStream);
- var sr = new StreamReader(fromStream);
- var numberOfEntry = br.ReadInt32();
- LastEntrySize = br.ReadInt32();
- Offsets = new List<long>(numberOfEntry);
- Urls = new List<(string, string)>(numberOfEntry);
- for (var i = 0; i < numberOfEntry; i++) Offsets.Add(br.ReadInt64());
- for (var i = 0; i < numberOfEntry; i++)
- {
- var line = sr.ReadLine();
- var parts = line!.Split(' ', 2);
- Urls.Add((parts[0], parts[1]));
- }
- }
- public void WriteToStream(Stream stream)
- {
- if (LastEntrySize.HasValue == false) throw new InvalidOperationException("LastEntrySize can not be null");
- var count = Offsets.Count;
- if (count != Urls.Count) throw new InvalidOperationException("count not matched between Offsets and Urls");
- var ms = new MemoryStream();
- using var bw = new BinaryWriter(ms, Encoding.ASCII, true);
- using var sw = new StreamWriter(ms, leaveOpen: true);
- bw.Write(count);
- bw.Write(LastEntrySize.Value);
- foreach (var offset in Offsets) bw.Write(offset);
- foreach (var tuple in Urls) sw.WriteLine($"{tuple.type} {tuple.url}");
- sw.Close();
- bw.Close();
- ms.Position = 0;
- ms.CopyTo(stream);
- }
- }
- private readonly CancellationTokenSource _cancellationTokenSource = new();
- private readonly Channel<FileDescriptor> _queue = Channel.CreateBounded<FileDescriptor>(int.MaxValue);
- private readonly IEventBus _eventBus;
- private bool _isWorking = false;
- private FileDescriptor? _currentFileDescriptor;
- private float? _currentProgress;
- public WarcIndexManager(IEventBus eventBus)
- {
- _eventBus = eventBus;
- _eventBus.Subscript<WarcIndexerLoadRequestEvent>(obj => _ = LoadIndexAsync(obj.FileDescriptor));
- _eventBus.Subscript<WarcIndexerStatusRequestEvent>(_ => PublishStatus());
- }
- private void PublishStatus() => _eventBus.Publish(new WarcIndexerStatusEvent(_isWorking, _queue.Reader.Count, _currentFileDescriptor, _currentProgress));
- private async Task LoadIndexAsync(FileDescriptor fd)
- {
- var dir = FolderLayer.MakeIndexDirOrFallback(fd);
- var len = await FileReader.GetLengthAsync(fd);
- var gbIndex = 0;
- do
- {
- var indexFilePath = Path.Combine(dir, FolderLayer.MakeIndexFileName(gbIndex));
- if (File.Exists(indexFilePath) == false)
- {
- await _queue.Writer.WriteAsync(fd);
- return;
- }
- var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(indexFilePath));
- var batch = new List<WarcIndexEntry>();
- for (var i = 0; i < indexContent.Offsets.Count; i++)
- {
- var (url, type) = indexContent.Urls[i];
- batch.Add(new WarcIndexEntry(fd, indexContent.Offsets[i], type, url));
- }
- _eventBus.Publish(new WarcIndexerLoadedEvent(batch));
- if (indexContent.Offsets.Last() + indexContent.LastEntrySize == len)
- {
- return;
- }
- gbIndex++;
- } while (true);
- }
- public void Start()
- {
- Task.Run(async () =>
- {
- while (_cancellationTokenSource.IsCancellationRequested == false)
- {
- _isWorking = false;
- _currentFileDescriptor = null;
- _currentProgress = null;
- PublishStatus();
- var fileDescriptor = await _queue.Reader.ReadAsync(_cancellationTokenSource.Token);
- _isWorking = true;
- _currentFileDescriptor = fileDescriptor;
- PublishStatus();
- var length = await FileReader.GetLengthAsync(fileDescriptor);
- var indexDir = FolderLayer.MakeIndexDirOrFallback(fileDescriptor);
- var existIndexFile = Directory.GetFiles(indexDir).OrderBy(p => p).ToArray();
- long loadPosition = 0;
- var lastExistIndexFile = existIndexFile.LastOrDefault();
- if (lastExistIndexFile != null)
- {
- var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(lastExistIndexFile));
- loadPosition = indexContent.Offsets.Last() + indexContent.LastEntrySize!.Value;
- if (loadPosition == length) continue; // skip completely indexed
- }
- // Start(continue) indexing
- await using var stream = await FileReader.OpenReadStreamAsync(fileDescriptor);
- stream.SeekForwardStupid(loadPosition);
- _currentProgress = (float)loadPosition / length;
- PublishStatus();
- IndexFormat? indexingFile = null;
- var indexingGb = (int)(loadPosition / Gib);
- int? lastBlockSize;
- do
- {
- if (_cancellationTokenSource.IsCancellationRequested) return;
- if (indexingFile == null) indexingFile = new();
- //handle index entry
- indexingFile.Offsets.Add(loadPosition);
- var block = WarcParser.ReadEntryBlock(stream);
- lastBlockSize = block.Length;
- loadPosition += block.Length;
- _eventBus.Publish(new WarcIndexerStatusEvent(true, _queue.Reader.Count, fileDescriptor, (float)loadPosition / length));
- var entry = WarcParser.ParseBlockForIndexing(block);
- indexingFile.Urls.Add((entry.Url, entry.Type));
- //check per gib split and flip
- var nowGb = (int)(loadPosition / Gib);
- if (nowGb != indexingGb)
- {
- indexingFile.LastEntrySize = block.Length;
- await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
- indexingFile.WriteToStream(xzWriteStream);
- indexingFile = null;
- indexingGb = nowGb;
- }
- } while (loadPosition < length);
- if (indexingFile != null) //write last part of index
- {
- indexingFile.LastEntrySize = lastBlockSize.Value;
- await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
- indexingFile.WriteToStream(xzWriteStream);
- }
- }
- });
- }
- public void Stop() => _cancellationTokenSource.Cancel();
- public enum IndexingState
- {
- NotIndexed,
- PartIndexed,
- CompletelyIndexed
- }
- }
|