Commit 0be738e7 authored by Guillaume Perréal's avatar Guillaume Perréal
Browse files

Split le cache.

parent f2718fac
This diff is collapsed.
import { HttpResponseBase } from '@angular/common/http'; import { HttpResponseBase } from '@angular/common/http';
import * as _ from 'lodash'; import { Observable } from 'rxjs';
import { Observable, of, race, Subject, throwError } from 'rxjs'; import { map, switchMap, tap } from 'rxjs/operators';
import { map, switchMap, take, tap } from 'rxjs/operators';
import { import {
AbstractResourceCache, AbstractResourceCache,
...@@ -11,75 +10,9 @@ import { ...@@ -11,75 +10,9 @@ import {
IRI, IRI,
IRI_PROPERTY, IRI_PROPERTY,
Resource, Resource,
} from '../../shared/models'; } from '../types';
import { safeForkJoin } from '../../shared/rxjs';
export class APICacheError extends Error {}
export class MissingIRIError extends APICacheError {
public constructor() {
super(`resource must have an ${IRI_PROPERTY} property`);
}
}
export class IRIMismatchError extends APICacheError {
public constructor(expected: any, actual: any) {
super(`${IRI_PROPERTY}s mismatch: ${actual} !== ${expected}`);
}
}
/**
* ValueHolder gère les requêtes d'une seule ressource.
*
* @internal
*/
export class ValueHolder<R extends Resource> {
private readonly value$ = new Subject<R>();
private readonly value = {} as R;
private version = 0;
constructor(private readonly iri: IRI<R>) {}
public set(value: R): Observable<R> {
if (!(IRI_PROPERTY in value)) {
return throwError(new MissingIRIError());
}
if (value[IRI_PROPERTY] !== this.iri) {
return throwError(new IRIMismatchError(this.iri, value[IRI_PROPERTY]));
}
_.assign(this.value, value);
_(this.value)
.keys()
.difference(_.keys(value))
.forEach((key) => delete this.value[key]);
this.version++;
this.value$.next(this.value);
return of(this.value);
}
public listen(queryFactory: () => Observable<R>): Observable<R> {
if (this.version > 0) {
return of(this.value);
}
return this.update(queryFactory());
}
public update(request$: Observable<R>): Observable<R> {
return race(this.value$.pipe(take(1)), request$.pipe(switchMap((item: R) => this.set(item))));
}
public invalidate(): void {
this.version = 0;
}
public delete(): void {
this.value$.complete();
}
}
import { ValueHolder } from './value-holder';
/** /**
* Implémentation d'un cache de resource. * Implémentation d'un cache de resource.
* *
...@@ -123,10 +56,11 @@ export class ResourceCache extends AbstractResourceCache { ...@@ -123,10 +56,11 @@ export class ResourceCache extends AbstractResourceCache {
public delete<R extends Resource>(iri: IRI<R>, query$: Observable<HttpResponseBase>): Observable<HttpResponseBase> { public delete<R extends Resource>(iri: IRI<R>, query$: Observable<HttpResponseBase>): Observable<HttpResponseBase> {
return query$.pipe( return query$.pipe(
tap(() => { tap(() => {
if (!this.holders.has(iri)) { const holder = this.holders.get(iri);
return; if (holder) {
this.holders.delete(iri);
holder.delete();
} }
this.holders.get(iri).delete();
this.holders.delete(iri); this.holders.delete(iri);
}), }),
); );
...@@ -141,7 +75,11 @@ export class ResourceCache extends AbstractResourceCache { ...@@ -141,7 +75,11 @@ export class ResourceCache extends AbstractResourceCache {
const members = getCollectionMembers(coll); const members = getCollectionMembers(coll);
const memberObservables$ = members.map((item) => this.received(item)); const memberObservables$ = members.map((item) => this.received(item));
return safeForkJoin(memberObservables$).pipe( return safeForkJoin(memberObservables$).pipe(
map((items) => Object.assign({} as Collection<R>, coll, { [COLLECTION_MEMBERS]: items })), map((items) =>
Object.assign({} as Collection<R>, coll, {
[COLLECTION_MEMBERS]: items,
}),
),
); );
}), }),
); );
...@@ -151,17 +89,17 @@ export class ResourceCache extends AbstractResourceCache { ...@@ -151,17 +89,17 @@ export class ResourceCache extends AbstractResourceCache {
* Invalide la valeur d'une IRI pour forcer une mise-à-jour. * Invalide la valeur d'une IRI pour forcer une mise-à-jour.
*/ */
public invalidate<R extends Resource>(iri: IRI<R>): void { public invalidate<R extends Resource>(iri: IRI<R>): void {
if (!this.holders.has(iri)) { const holder = this.holders.get(iri);
return; if (holder) {
holder.invalidate();
} }
this.holders.get(iri).invalidate();
} }
/** /**
* Retourne le ValueHolder d'une IRI, ou le crée si nécessaire. * Retourne le ValueHolder d'une IRI, ou le crée si nécessaire.
*/ */
private getHolder<R extends Resource>(iri: IRI<R>): ValueHolder<R> { private getHolder<R extends Resource>(iri: IRI<R>): ValueHolder<R> {
let holder = this.holders.get(iri) as ValueHolder<R>; let holder = this.holders.get(iri) as ValueHolder<R> | undefined;
if (!holder) { if (!holder) {
holder = new ValueHolder<R>(iri); holder = new ValueHolder<R>(iri);
this.holders.set(iri, holder); this.holders.set(iri, holder);
......
import { IRI_PROPERTY } from '../types';
export class APICacheError extends Error {}
export class MissingIRIError extends APICacheError {
public constructor() {
super(`resource must have an ${IRI_PROPERTY} property`);
}
}
export class IRIMismatchError extends APICacheError {
public constructor(expected: string, actual: string) {
super(`${IRI_PROPERTY}s mismatch: ${actual} !== ${expected}`);
}
}
export * from './cache.service';
export * from './errors';
import { forkJoin } from 'rxjs';
import { IRI, IRI_PROPERTY, Resource } from '../types';
interface MyResource extends Resource {
readonly '@id': IRI<MyResource>;
readonly '@type': 'MyResource';
value: string;
value2?: string;
}
function iri(x: string): IRI<MyResource> {
return x as any;
}
const MY_IRI = iri('/bla/a');
const OTHER_IRI = iri('/bla/B');
const VALUES: { [name: string]: MyResource } = {
a: { '@id': MY_IRI, '@type': 'MyResource', value: 'foo' },
b: { '@id': MY_IRI, '@type': 'MyResource', value: 'bar' },
c: {
'@id': MY_IRI,
'@type': 'MyResource',
value: 'bar',
value2: 'quz',
},
d: { '@id': OTHER_IRI, '@type': 'MyResource', value: 'zig' },
};
let scheduler: MarbleTestScheduler<any>;
beforeEach(() => {
scheduler = MarbleTestScheduler.create(VALUES, 'error');
});
describe('ValueHolder', () => {
let holder: ValueHolder<any>;
beforeEach(() => {
holder = new ValueHolder<MyResource>(iri('/bla/a'));
});
it('should be created', () => {
expect(holder).toBeTruthy();
});
describe('.set()', () => {
function testSet({ value, error, SET_M }: any) {
scheduler.withError(error).run(({ expectObservable }) => {
expectObservable(holder.set(value)).toBe(SET_M);
});
}
it('should provide the value', () =>
testSet({
value: VALUES.a,
SET_M: '(a|)',
}));
it(`should refuse value without ${IRI_PROPERTY}`, () =>
testSet({
value: {},
error: new MissingIRIError(),
SET_M: '#',
}));
it('should refuse value with different @id', () =>
testSet({
value: { '@id': iri('bar') },
error: new IRIMismatchError(MY_IRI, iri('bar')),
SET_M: '#',
}));
it('should always points to the same instance', () => {
forkJoin(holder.set(VALUES.a), holder.set(VALUES.b)).subscribe(([a, b]: MyResource[]) => {
expect(a).toBe(b);
});
});
});
describe('.update()', () => {
it('should provide the value from the server', () =>
scheduler.run(({ cold, expectObservable }) => {
const REQ_M = '---a|';
const UPD_M = '--(a|) ';
const request$ = cold(REQ_M);
expectObservable(holder.update(request$)).toBe(UPD_M);
}));
it('should cancel pending requests', () => {
const LOCAL_VALUES = {
a: VALUES.a,
b: VALUES.b,
j: [VALUES.b, VALUES.b],
};
const REQ1_M = '---a|';
const REQ2_M = 'b| ';
const UPDA_M = '(j|) ';
const REQ1_S = '(^!) ';
const REQ2_S = '(^!) ';
scheduler.withValues(LOCAL_VALUES).run(({ cold, expectObservable, expectSubscriptions }) => {
const request1$ = cold(REQ1_M);
const request2$ = cold(REQ2_M);
expectObservable(
safeForkJoin([
//
holder.update(request1$),
holder.update(request2$),
]),
).toBe(UPDA_M);
expectSubscriptions(request1$.subscriptions).toBe(REQ1_S);
expectSubscriptions(request2$.subscriptions).toBe(REQ2_S);
});
});
it('should propagate errors', () =>
scheduler.run(({ cold, expectObservable }) => {
const REQ_M = '#';
const UPD_M = '#';
const request$ = cold(REQ_M);
expectObservable(holder.update(request$)).toBe(UPD_M);
}));
it('should restart on errors', () =>
scheduler.run(({ cold, expectObservable }) => {
const REQ1_M = '#';
const UPD1_M = '#';
const obs$ = holder.update(cold(REQ1_M));
expectObservable(obs$).toBe(UPD1_M);
const REQ2_M = '-a|';
const UPD2_M = '-a|';
const obs2$ = holder.update(cold(REQ2_M));
expectObservable(obs2$).toBe(UPD2_M);
}));
});
describe('.listen()', () => {
function testListen({ REQUEST_M, LISTEN_M, initial }: any) {
scheduler.run(({ cold, expectObservable }) => {
if (initial) {
holder.set(initial);
}
expectObservable(holder.listen(() => cold(REQUEST_M))).toBe(LISTEN_M);
});
}
it('should provide the value', () =>
testListen({
initial: VALUES.a,
REQUEST_M: /**/ ' ',
LISTEN_M: /***/ '(a|)',
}));
it('should cache the value', () =>
testListen({
initial: VALUES.a,
REQUEST_M: /**/ 'b| ',
LISTEN_M: /***/ '(a|)',
}));
it('should propagate errors', () =>
testListen({
REQUEST_M: /**/ '#',
LISTEN_M: /***/ '#',
}));
});
it('.invalidate() should cause the value to be requested again', () =>
scheduler.run(({ cold, expectObservable }) => {
const REQUEST_M = /**/ '(a|)';
const LISTEN_M = /***/ '(a|)';
const requestFactory = jasmine.createSpy('requestFactory');
requestFactory.and.returnValue(cold(REQUEST_M));
return holder
.set(VALUES.a)
.toPromise()
.then(() => holder.invalidate())
.then(() => expectObservable(holder.listen(requestFactory)).toBe(LISTEN_M))
.then(() => expect(requestFactory).toHaveBeenCalled());
}));
});
import { Observable, of, race, Subject, throwError } from 'rxjs';
import * as _ from 'lodash';
import { switchMap, take } from 'rxjs/operators';
import { IRI, IRI_PROPERTY, Resource } from '../types';
import { IRIMismatchError, MissingIRIError } from './errors';
/**
* ValueHolder gère les requêtes d'une seule ressource.
*
* @internal
*/
export class ValueHolder<R extends Resource> {
private readonly value$ = new Subject<R>();
private readonly value = {} as R;
private version = 0;
constructor(private readonly iri: IRI<R>) {}
public set(value: R): Observable<R> {
if (!(IRI_PROPERTY in value)) {
return throwError(new MissingIRIError());
}
if (value[IRI_PROPERTY] !== this.iri) {
return throwError(new IRIMismatchError(this.iri.toString(), value[IRI_PROPERTY].toString()));
}
_.assign(this.value, value);
_(this.value)
.keys()
.difference(_.keys(value))
.forEach((key) => delete this.value[key]);
this.version++;
this.value$.next(this.value);
return of(this.value);
}
public listen(queryFactory: () => Observable<R>): Observable<R> {
if (this.version > 0) {
return of(this.value);
}
return this.update(queryFactory());
}
public update(request$: Observable<R>): Observable<R> {
return race(this.value$.pipe(take(1)), request$.pipe(switchMap((item: R) => this.set(item))));
}
public invalidate(): void {
this.version = 0;
}
public delete(): void {
this.value$.complete();
}
}
...@@ -49,11 +49,16 @@ export interface APIMetadataRegistry<API extends APIMeta> { ...@@ -49,11 +49,16 @@ export interface APIMetadataRegistry<API extends APIMeta> {
* Registre de métadonnées qui construit les instances à la demande (ce qui permet de gérer les * Registre de métadonnées qui construit les instances à la demande (ce qui permet de gérer les
* dépendances entre métadonnées). * dépendances entre métadonnées).
*/ */
export class LazyMetadataRegistry<API extends APIMeta> implements APIMetadataRegistry<API> { export class LazyMetadataRegistry<API extends APIMeta>
implements APIMetadataRegistry<API> {
private readonly instances: { [T in keyof API]?: API[T]['metadata'] } = {}; private readonly instances: { [T in keyof API]?: API[T]['metadata'] } = {};
protected constructor( protected constructor(
private readonly builders: { readonly [T in keyof API]: (r?: APIMetadataRegistry<API>) => API[T]['metadata'] }, private readonly builders: {
readonly [T in keyof API]: (
r?: APIMetadataRegistry<API>,
) => API[T]['metadata'];
},
) {} ) {}
public has<T extends keyof API>(type: T): type is T { public has<T extends keyof API>(type: T): type is T {
...@@ -99,7 +104,10 @@ export interface APIService<API extends APIMeta> { ...@@ -99,7 +104,10 @@ export interface APIService<API extends APIMeta> {
/** /**
* Récupère une ressource par son IRI ou par son type et les paramètres de son IRI. * Récupère une ressource par son IRI ou par son type et les paramètres de son IRI.
*/ */
get<R extends Resource>(iri: IRI<R>, options?: RequestOptions): Observable<R>; get<R extends Resource>(
iri: IRI<R>,
options?: RequestOptions,
): Observable<R>;
get<T extends keyof API>( get<T extends keyof API>(
type: T, type: T,
parameters: API[T]['iriParameters'], parameters: API[T]['iriParameters'],
...@@ -109,12 +117,18 @@ export interface APIService<API extends APIMeta> { ...@@ -109,12 +117,18 @@ export interface APIService<API extends APIMeta> {
/** /**
* Récupère des ressources par leurs IRIs. * Récupère des ressources par leurs IRIs.
*/ */
getMany<R extends Resource>(iris: IRI<R>[], options?: RequestOptions): Observable<R[]>; getMany<R extends Resource>(
iris: IRI<R>[],
options?: RequestOptions,
): Observable<R[]>;
/** /**
* Génère l'IRI d'une resource à partir de son type et des paramètres d'IRI. * Génère l'IRI d'une resource à partir de son type et des paramètres d'IRI.
*/ */
generateIRI<T extends keyof API, P extends string[]>(type: T, parameters: P): IRI<any>; generateIRI<T extends keyof API, P extends string[]>(
type: T,
parameters: P,
): IRI<any>;
/** /**
* Invalide le cache pour une IRI. * Invalide le cache pour une IRI.
...@@ -137,7 +151,10 @@ export abstract class AbstractAPIService< ...@@ -137,7 +151,10 @@ export abstract class AbstractAPIService<
private readonly client: HttpClient, private readonly client: HttpClient,
) {} ) {}
public get<R extends Resource>(iri: IRI<R>, options?: RequestOptions): Observable<R>; public get<R extends Resource>(
iri: IRI<R>,
options?: RequestOptions,
): Observable<R>;
public get<T extends keyof API>( public get<T extends keyof API>(
type: T, type: T,
parameters: API[T]['iriParameters'], parameters: API[T]['iriParameters'],
...@@ -151,19 +168,29 @@ export abstract class AbstractAPIService< ...@@ -151,19 +168,29 @@ export abstract class AbstractAPIService<
): Observable<R> { ): Observable<R> {
let iri: IRI<R>; let iri: IRI<R>;
if (this.metadata.has(typeOrIRI as string)) { if (this.metadata.has(typeOrIRI as string)) {
iri = this.metadata.get(typeOrIRI as string).generateIRI(parametersOrOptions as API[T]['iriParameters']); iri = this.metadata
.get(typeOrIRI as string)
.generateIRI(parametersOrOptions as API[T]['iriParameters']);
} else { } else {
iri = typeOrIRI as IRI<R>; iri = typeOrIRI as IRI<R>;
options = parametersOrOptions as RequestOptions; options = parametersOrOptions as RequestOptions;
} }
return this.cache.get(iri, () => this.client.get<R>(iri as any, options)); return this.cache.get(iri, () =>
this.client.get<R>(iri as any, options),
);
} }
public getMany<R extends Resource>(iris: IRI<R>[], options?: RequestOptions): Observable<R[]> { public getMany<R extends Resource>(
iris: IRI<R>[],
options?: RequestOptions,
): Observable<R[]> {
return forkJoin(iris.map((iri) => this.get(iri, options))); return forkJoin(iris.map((iri) => this.get(iri, options)));
} }
public generateIRI<T extends keyof API>(type: T, parameters: API[T]['iriParameters']): IRI<API[T]['resource']> { public generateIRI<T extends keyof API>(
type: T,
parameters: API[T]['iriParameters'],
): IRI<API[T]['resource']> {
return this.metadata.get(type).generateIRI(parameters); return this.metadata.get(type).generateIRI(parameters);
} }
......
...@@ -22,17 +22,25 @@ export abstract class AbstractResourceCache { ...@@ -22,17 +22,25 @@ export abstract class AbstractResourceCache {
* Récupère une ressource par son IRI. N'exécute la requête requestFactory que si on ne dispose * Récupère une ressource par son IRI. N'exécute la requête requestFactory que si on ne dispose
* pas d'une version en cache. * pas d'une version en cache.
*/ */
public abstract get<R extends Resource>(iri: IRI<R>, requestFactory: () => Observable<R>): Observable<R>; public abstract get<R extends Resource>(
iri: IRI<R>,
requestFactory: () => Observable<R>,
): Observable<R>;
/** /**
* Met à jour une ressource existante, rafraîchit le cache local avec la réponse. * Met à jour une ressource existante, rafraîchit le cache local avec la réponse.
*/ */
public abstract put<R extends Resource>(iri: IRI<R>, request: Observable<R>): Observable<R>; public abstract put<R extends Resource>(
iri: IRI<R>,
request: Observable<R>,
): Observable<R>;
/** /**
* Crée une nouvelle ressource et met la ressource créée dans le cache. * Crée une nouvelle ressource et met la ressource créée dans le cache.
*/ */
public abstract post<R extends Resource>(request: Observable<R>): Observable<R>; public abstract post<R extends Resource>(
request: Observable<R>,
): Observable<R>;
/** /**