using System.Text;
using System.Threading.Channels;
using WarcViewerBlazorWinForm.Backend.IO;
using WarcViewerBlazorWinForm.Backend.IO.Archiving;
using WarcViewerBlazorWinForm.Library.AssemblyInject.Interfaces;
using WarcViewerBlazorWinForm.Library.EventBus;
namespace WarcViewerBlazorWinForm.Backend.Warc;
internal class WarcIndexManager : IAssemblyInjectSingleton, IAssemblyInjectSyncStarStop
{
private const long Gib = 1024 * 1024 * 1024;
private static class FolderLayer
{
// normal
// .idx/[archive key/]
// .idx/[archive key/]
// fallback
// /fallback index//[archive key/]
// file name
// 0.index.xz
public static string MakeIndexDirOrFallback(FileDescriptor fd)
{
var inputFileFullPath = Path.GetFullPath(fd.FilePath);
var dir = inputFileFullPath + ".idx";
if (fd.IsReadDirectly == false && fd.ArchiveEntryKey != null) dir = Path.Combine(dir, fd.ArchiveEntryKey.Trim().TrimStart('/', '\\'));
if (Directory.Exists(dir)) return dir;
try
{
Directory.CreateDirectory(dir);
return dir;
}
catch (IOException)
{
var fallbackDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "fallback index", Path.GetFileNameWithoutExtension(fd.FilePath), fd.ArchiveEntryKey ?? "");
Directory.CreateDirectory(fallbackDir);
return fallbackDir;
}
}
public static string MakeIndexFileName(int gbIndex) => $"{gbIndex:00000}.index.xz";
}
private class IndexFormat
{
// int32 - NumberOfEntry
// int32 - LastEntrySize for check index integrity
// int64* - BLOB entry offset
// LINES* - LINES:
public List Offsets { get; }
public int? LastEntrySize { get; set; }
public List<(string url, string type)> Urls { get; }
public IndexFormat()
{
Offsets = [];
Urls = [];
}
public IndexFormat(Stream fromStream)
{
if (fromStream == null) throw new ArgumentNullException(nameof(fromStream));
var br = new BinaryReader(fromStream);
var sr = new StreamReader(fromStream);
var numberOfEntry = br.ReadInt32();
LastEntrySize = br.ReadInt32();
Offsets = new List(numberOfEntry);
Urls = new List<(string, string)>(numberOfEntry);
for (var i = 0; i < numberOfEntry; i++) Offsets.Add(br.ReadInt64());
for (var i = 0; i < numberOfEntry; i++)
{
var line = sr.ReadLine();
var parts = line!.Split(' ', 2);
Urls.Add((parts[0], parts[1]));
}
}
public void WriteToStream(Stream stream)
{
if (LastEntrySize.HasValue == false) throw new InvalidOperationException("LastEntrySize can not be null");
var count = Offsets.Count;
if (count != Urls.Count) throw new InvalidOperationException("count not matched between Offsets and Urls");
var ms = new MemoryStream();
using var bw = new BinaryWriter(ms, Encoding.ASCII, true);
using var sw = new StreamWriter(ms, leaveOpen: true);
bw.Write(count);
bw.Write(LastEntrySize.Value);
foreach (var offset in Offsets) bw.Write(offset);
foreach (var tuple in Urls) sw.WriteLine($"{tuple.type} {tuple.url}");
sw.Close();
bw.Close();
ms.Position = 0;
ms.CopyTo(stream);
}
}
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly Channel _queue = Channel.CreateBounded(int.MaxValue);
private readonly IEventBus _eventBus;
private bool _isWorking = false;
private FileDescriptor? _currentFileDescriptor;
private float? _currentProgress;
public WarcIndexManager(IEventBus eventBus)
{
_eventBus = eventBus;
_eventBus.Subscript(obj => _ = LoadIndexAsync(obj.FileDescriptor));
_eventBus.Subscript(_ => PublishStatus());
}
private void PublishStatus() => _eventBus.Publish(new WarcIndexerStatusEvent(_isWorking, _queue.Reader.Count, _currentFileDescriptor, _currentProgress));
private async Task LoadIndexAsync(FileDescriptor fd)
{
var dir = FolderLayer.MakeIndexDirOrFallback(fd);
var len = await FileReader.GetLengthAsync(fd);
var gbIndex = 0;
do
{
var indexFilePath = Path.Combine(dir, FolderLayer.MakeIndexFileName(gbIndex));
if (File.Exists(indexFilePath) == false)
{
await _queue.Writer.WriteAsync(fd);
return;
}
var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(indexFilePath));
var batch = new List();
for (var i = 0; i < indexContent.Offsets.Count; i++)
{
var (url, type) = indexContent.Urls[i];
batch.Add(new WarcIndexEntry(fd, indexContent.Offsets[i], type, url));
}
_eventBus.Publish(new WarcIndexerLoadedEvent(batch));
if (indexContent.Offsets.Last() + indexContent.LastEntrySize == len)
{
return;
}
gbIndex++;
} while (true);
}
public void Start()
{
Task.Run(async () =>
{
while (_cancellationTokenSource.IsCancellationRequested == false)
{
_isWorking = false;
_currentFileDescriptor = null;
_currentProgress = null;
PublishStatus();
var fileDescriptor = await _queue.Reader.ReadAsync(_cancellationTokenSource.Token);
_isWorking = true;
_currentFileDescriptor = fileDescriptor;
PublishStatus();
var length = await FileReader.GetLengthAsync(fileDescriptor);
var indexDir = FolderLayer.MakeIndexDirOrFallback(fileDescriptor);
var existIndexFile = Directory.GetFiles(indexDir).OrderBy(p => p).ToArray();
long loadPosition = 0;
var lastExistIndexFile = existIndexFile.LastOrDefault();
if (lastExistIndexFile != null)
{
var indexContent = new IndexFormat(XzRwOps.OpenReadXzFile(lastExistIndexFile));
loadPosition = indexContent.Offsets.Last() + indexContent.LastEntrySize!.Value;
if (loadPosition == length) continue; // skip completely indexed
}
// Start(continue) indexing
await using var stream = await FileReader.OpenReadStreamAsync(fileDescriptor);
stream.SeekForwardStupid(loadPosition);
_currentProgress = (float)loadPosition / length;
PublishStatus();
IndexFormat? indexingFile = null;
var indexingGb = (int)(loadPosition / Gib);
int? lastBlockSize;
do
{
if (_cancellationTokenSource.IsCancellationRequested) return;
if (indexingFile == null) indexingFile = new();
//handle index entry
indexingFile.Offsets.Add(loadPosition);
var block = WarcParser.ReadEntryBlock(stream);
lastBlockSize = block.Length;
loadPosition += block.Length;
_eventBus.Publish(new WarcIndexerStatusEvent(true, _queue.Reader.Count, fileDescriptor, (float)loadPosition / length));
var entry = WarcParser.ParseBlockForIndexing(block);
indexingFile.Urls.Add((entry.Url, entry.Type));
//check per gib split and flip
var nowGb = (int)(loadPosition / Gib);
if (nowGb != indexingGb)
{
indexingFile.LastEntrySize = block.Length;
await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
indexingFile.WriteToStream(xzWriteStream);
indexingFile = null;
indexingGb = nowGb;
}
} while (loadPosition < length);
if (indexingFile != null) //write last part of index
{
indexingFile.LastEntrySize = lastBlockSize.Value;
await using var xzWriteStream = XzRwOps.OpenWriteXzFile(Path.Combine(indexDir, FolderLayer.MakeIndexFileName(indexingGb)));
indexingFile.WriteToStream(xzWriteStream);
}
}
});
}
public void Stop() => _cancellationTokenSource.Cancel();
public enum IndexingState
{
NotIndexed,
PartIndexed,
CompletelyIndexed
}
}